Hi Community,
I do streaming in event time and I want to preserve ordering and late events. I have a use case where I need to fire an aggregation function for events of last n seconds(time units in general) for every incoming event. It seems to me that windowing is not suitable since it may be expressed either in time or in events count, not "last n seconds for each single event". Is there an idiomatic way to do this? Any examples or help are appreciated. Thanks in advance. Best regards, Oleg Bonar |
can u do RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW? On Tue, Feb 11, 2020 at 12:15 PM oleg <[hidden email]> wrote: Hi Community, |
Hi Oleg,
Could you be more specific on what do you mean by "for events of last n seconds(time units in general) for every incoming event."? Do you mean that you have a stream of parallelism 1 and you want for each incoming element to have your function fire with input the event itself and all the events that arrived within the last N time units? If this is the case, you can use a dummy key to key your stream to have access to keyed state, then use Map State with key being the timestamp and value being a list of the already seen elements with that timestamp and whenever an element arrives, you can register a timer to fire N time units in the future. Then, when the timer fires, you can iterate over the map, fetch the elements you are interested in, and clean-up whatever you will not need anymore. For an example you could look at [1]. I hope this helps, Kostas [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <[hidden email]> wrote: > > can u do > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW? > > On Tue, Feb 11, 2020 at 12:15 PM oleg <[hidden email]> wrote: >> >> Hi Community, >> >> I do streaming in event time and I want to preserve ordering and late >> events. I have a use case where I need to fire an aggregation function >> for events of last n seconds(time units in general) for every incoming >> event. >> >> It seems to me that windowing is not suitable since it may be expressed >> either in time or in events count, not "last n seconds for each single >> event". >> >> Is there an idiomatic way to do this? Any examples or help are >> appreciated. Thanks in advance. >> >> >> Best regards, >> >> Oleg Bonar >> |
Hi Oleg,
With the approach with the MapState you can always fire on every incoming element :) You just iterate in the map state and find all the elements that have timestamp (key) between the timestamp of the current element (NOW) and and NOW-N. Anyway, if Fanbin's solution works, then you can always use that! Cheers, Kostas On Wed, Feb 12, 2020 at 7:18 PM Олег Бонарь <[hidden email]> wrote: > > Hi Kostas, > > Thanks for your reply! > Yes, you understand me correctly. However, I also want the stream to be keyed to process it in parallel. I'm afraid the approach with MapState you suggested doesn't really suite my use case because I need to fire on every incoming event. > Logically, Fanbin's "RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW" looks 100% like what I need, but I haven't tried it yet. > Also wondering if it might be expressed in DataStream API. > > ср, 12 февр. 2020 г. в 13:06, Kostas Kloudas <[hidden email]>: >> >> Hi Oleg, >> >> Could you be more specific on what do you mean by >> "for events of last n seconds(time units in general) for every incoming event."? >> >> Do you mean that you have a stream of parallelism 1 and you want for >> each incoming element to have your function fire with input the event >> itself and all the events that arrived within the last N time units? >> If this is the case, you can use a dummy key to key your stream to >> have access to keyed state, then use Map State with key being the >> timestamp and value being a list of the already seen elements with >> that timestamp and whenever an element arrives, you can register a >> timer to fire N time units in the future. Then, when the timer fires, >> you can iterate over the map, fetch the elements you are interested >> in, and clean-up whatever you will not need anymore. >> >> For an example you could look at [1]. >> >> I hope this helps, >> Kostas >> >> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >> >> On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <[hidden email]> wrote: >> > >> > can u do >> > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW? >> > >> > On Tue, Feb 11, 2020 at 12:15 PM oleg <[hidden email]> wrote: >> >> >> >> Hi Community, >> >> >> >> I do streaming in event time and I want to preserve ordering and late >> >> events. I have a use case where I need to fire an aggregation function >> >> for events of last n seconds(time units in general) for every incoming >> >> event. >> >> >> >> It seems to me that windowing is not suitable since it may be expressed >> >> either in time or in events count, not "last n seconds for each single >> >> event". >> >> >> >> Is there an idiomatic way to do this? Any examples or help are >> >> appreciated. Thanks in advance. >> >> >> >> >> >> Best regards, >> >> >> >> Oleg Bonar >> >> |
Free forum by Nabble | Edit this page |