Aggregation for last n seconds for each event

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Aggregation for last n seconds for each event

oleg
Hi Community,

I do streaming in event time and I want to preserve ordering and late
events. I have a use case where I need to fire an aggregation function
for events of last n seconds(time units in general) for every incoming
event.

It seems to me that windowing is not suitable since it may be expressed
either in time or in events count, not "last n seconds for each single
event".

Is there an idiomatic way to do this? Any examples or help are
appreciated. Thanks in advance.


Best regards,

Oleg Bonar

Reply | Threaded
Open this post in threaded view
|

Re: Aggregation for last n seconds for each event

Fanbin Bu
can u do
RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?

On Tue, Feb 11, 2020 at 12:15 PM oleg <[hidden email]> wrote:
Hi Community,

I do streaming in event time and I want to preserve ordering and late
events. I have a use case where I need to fire an aggregation function
for events of last n seconds(time units in general) for every incoming
event.

It seems to me that windowing is not suitable since it may be expressed
either in time or in events count, not "last n seconds for each single
event".

Is there an idiomatic way to do this? Any examples or help are
appreciated. Thanks in advance.


Best regards,

Oleg Bonar

Reply | Threaded
Open this post in threaded view
|

Re: Aggregation for last n seconds for each event

Kostas Kloudas-2
Hi Oleg,

Could you be more specific on what do you mean by
"for events of last n seconds(time units in general) for every incoming event."?

Do you mean that you have a stream of parallelism 1 and you want for
each incoming element to have your function fire with input the event
itself and all the events that arrived within the last N time units?
If this is the case, you can use a dummy key to key your stream to
have access to keyed state, then use Map State with key being the
timestamp and value being a list of the already seen elements with
that timestamp and whenever an element arrives, you can register a
timer to fire N time units in the future. Then, when the timer fires,
you can iterate over the map, fetch the elements you are interested
in, and clean-up whatever you will not need anymore.

For an example you could look at [1].

I hope this helps,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <[hidden email]> wrote:

>
> can u do
> RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>
> On Tue, Feb 11, 2020 at 12:15 PM oleg <[hidden email]> wrote:
>>
>> Hi Community,
>>
>> I do streaming in event time and I want to preserve ordering and late
>> events. I have a use case where I need to fire an aggregation function
>> for events of last n seconds(time units in general) for every incoming
>> event.
>>
>> It seems to me that windowing is not suitable since it may be expressed
>> either in time or in events count, not "last n seconds for each single
>> event".
>>
>> Is there an idiomatic way to do this? Any examples or help are
>> appreciated. Thanks in advance.
>>
>>
>> Best regards,
>>
>> Oleg Bonar
>>
Reply | Threaded
Open this post in threaded view
|

Re: Aggregation for last n seconds for each event

Kostas Kloudas-2
Hi Oleg,

With the approach with the MapState you can always fire on every
incoming element :)
You just iterate in the map state and find all the elements that have
timestamp (key) between the timestamp of the current element (NOW) and
and NOW-N.

Anyway, if Fanbin's solution works, then you can always use that!

Cheers,
Kostas

On Wed, Feb 12, 2020 at 7:18 PM Олег Бонарь <[hidden email]> wrote:

>
> Hi Kostas,
>
> Thanks for your reply!
> Yes, you understand me correctly. However, I also want the stream to be keyed to process it in parallel. I'm afraid the approach with MapState you suggested doesn't really suite my use case because I need to fire on every incoming event.
> Logically, Fanbin's "RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW" looks 100% like what I need, but I haven't tried it yet.
> Also wondering if it might be expressed in DataStream API.
>
> ср, 12 февр. 2020 г. в 13:06, Kostas Kloudas <[hidden email]>:
>>
>> Hi Oleg,
>>
>> Could you be more specific on what do you mean by
>> "for events of last n seconds(time units in general) for every incoming event."?
>>
>> Do you mean that you have a stream of parallelism 1 and you want for
>> each incoming element to have your function fire with input the event
>> itself and all the events that arrived within the last N time units?
>> If this is the case, you can use a dummy key to key your stream to
>> have access to keyed state, then use Map State with key being the
>> timestamp and value being a list of the already seen elements with
>> that timestamp and whenever an element arrives, you can register a
>> timer to fire N time units in the future. Then, when the timer fires,
>> you can iterate over the map, fetch the elements you are interested
>> in, and clean-up whatever you will not need anymore.
>>
>> For an example you could look at [1].
>>
>> I hope this helps,
>> Kostas
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <[hidden email]> wrote:
>> >
>> > can u do
>> > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>> >
>> > On Tue, Feb 11, 2020 at 12:15 PM oleg <[hidden email]> wrote:
>> >>
>> >> Hi Community,
>> >>
>> >> I do streaming in event time and I want to preserve ordering and late
>> >> events. I have a use case where I need to fire an aggregation function
>> >> for events of last n seconds(time units in general) for every incoming
>> >> event.
>> >>
>> >> It seems to me that windowing is not suitable since it may be expressed
>> >> either in time or in events count, not "last n seconds for each single
>> >> event".
>> >>
>> >> Is there an idiomatic way to do this? Any examples or help are
>> >> appreciated. Thanks in advance.
>> >>
>> >>
>> >> Best regards,
>> >>
>> >> Oleg Bonar
>> >>