Hi,
you can implement that with a ProcessFunction [1].
The ProcessFunction would have to compute counts at some granularity (either event-time or processing-time) of records that are processed by the ProcessFunction in processElement().
I would do this with a MapState that has a timestamp as key and a count as value. The counts should be stored with some resolution, e.g., every 10 seconds (depends on your requirements).
This means you have one count for all elements that arrive within 10 seconds.
The actual count of the last 5 minutes is stored in a ValueState<Long>. and updated in regular intervals using timers.
When updating, you iterate over the MapState and discard all counts that are older than 5 minutes and compute the new count from the remaining counts.
The ValueState is configured to be queryable.
Best, Fabian