Hi according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases" The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins) After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one) so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late thanks in advance |
Hi Igor,
To handle late events in Flink you would have to implement you own custom trigger. To see a relatively more complex example of such a trigger and how to implement it, you can have a look at this implementation: https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java Which implements the trigger described in this article (before the conclusions section) Thanks, Kostas
|
thanks Kosta On 3 June 2016 at 16:47, Kostas Kloudas <[hidden email]> wrote:
|
You are welcome!
|
Super cool stuff On Fri, Jun 3, 2016 at 10:55 AM, Kostas Kloudas <[hidden email]> wrote:
|
Hi Igor, you might be interested in this doc about how we want to improve handling of late data and some other things in the windowing API: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing I've sent it around several times but you can never know who's aware of it already. :-) Cheers, Aljoscha On Fri, 3 Jun 2016 at 22:02 Michael Tamillow <[hidden email]> wrote:
|
In reply to this post by Kostas Kloudas
On Fri, Jun 3, 2016 at 6:47 AM, Kostas Kloudas <[hidden email]> wrote:
I've modified this trigger so that firing are suppressed unless there are new events between timers. This can significantly reduce the outputted events, which could mean much reduced writes to a downstream data store. See https://gist.github.com/eliaslevy/ec840444607b9a5dd5aa3eb2cdd77932. Also, I find the accumulating behavior somewhat unintuitive as when disabled it only purges when the time window ends. When discarding is in effect, it seems more natural for purging it to occur at each firing, whether early, at the windows event time end, or late. Otherwise, you may end up with output events of different semantics. E.g. with the current behavior if you are implementing a counter early firing will result on partial counts until the window end, after that late will give you partial counts of the delta from the window end count. It would be more consistent to either generate partial counts at all firing or deltas at all firing, so that the output of the operator can be processes the same downstream. |
Free forum by Nabble | Edit this page |