Event processing time with lateness

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Event processing time with lateness

igor.berman

Hi 

according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete

you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late

thanks in advance


Reply | Threaded
Open this post in threaded view
|

Re: Event processing time with lateness

Kostas Kloudas
Hi Igor,

To handle late events in Flink you would have to implement you own custom trigger.

To see a relatively more complex example of such a trigger and how to implement it,

Which implements the trigger described in this article (before the conclusions section)

Thanks,
Kostas

On Jun 3, 2016, at 2:55 PM, Igor Berman <[hidden email]> wrote:

Hi 

according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete

you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late

thanks in advance



Reply | Threaded
Open this post in threaded view
|

Re: Event processing time with lateness

igor.berman
thanks Kosta

On 3 June 2016 at 16:47, Kostas Kloudas <[hidden email]> wrote:
Hi Igor,

To handle late events in Flink you would have to implement you own custom trigger.

To see a relatively more complex example of such a trigger and how to implement it,

Which implements the trigger described in this article (before the conclusions section)

Thanks,
Kostas

On Jun 3, 2016, at 2:55 PM, Igor Berman <[hidden email]> wrote:

Hi 

according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete

you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late

thanks in advance




Reply | Threaded
Open this post in threaded view
|

Re: Event processing time with lateness

Kostas Kloudas
You are welcome!

On Jun 3, 2016, at 4:40 PM, Igor Berman <[hidden email]> wrote:

thanks Kosta

On 3 June 2016 at 16:47, Kostas Kloudas <[hidden email]> wrote:
Hi Igor,

To handle late events in Flink you would have to implement you own custom trigger.

To see a relatively more complex example of such a trigger and how to implement it,

Which implements the trigger described in this article (before the conclusions section)

Thanks,
Kostas

On Jun 3, 2016, at 2:55 PM, Igor Berman <[hidden email]> wrote:

Hi 

according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete

you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late

thanks in advance





Reply | Threaded
Open this post in threaded view
|

Re: Event processing time with lateness

Michael Tamillow
Super cool stuff

On Fri, Jun 3, 2016 at 10:55 AM, Kostas Kloudas <[hidden email]> wrote:
You are welcome!


On Jun 3, 2016, at 4:40 PM, Igor Berman <[hidden email]> wrote:

thanks Kosta

On 3 June 2016 at 16:47, Kostas Kloudas <[hidden email]> wrote:
Hi Igor,

To handle late events in Flink you would have to implement you own custom trigger.

To see a relatively more complex example of such a trigger and how to implement it,

Which implements the trigger described in this article (before the conclusions section)

Thanks,
Kostas

On Jun 3, 2016, at 2:55 PM, Igor Berman <[hidden email]> wrote:

Hi 

according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete

you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late

thanks in advance






Reply | Threaded
Open this post in threaded view
|

Re: Event processing time with lateness

Aljoscha Krettek
Hi Igor,
you might be interested in this doc about how we want to improve handling of late data and some other things in the windowing API: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing

I've sent it around several times but you can never know who's aware of it already. :-)

Cheers,
Aljoscha

On Fri, 3 Jun 2016 at 22:02 Michael Tamillow <[hidden email]> wrote:
Super cool stuff

On Fri, Jun 3, 2016 at 10:55 AM, Kostas Kloudas <[hidden email]> wrote:
You are welcome!


On Jun 3, 2016, at 4:40 PM, Igor Berman <[hidden email]> wrote:

thanks Kosta

On 3 June 2016 at 16:47, Kostas Kloudas <[hidden email]> wrote:
Hi Igor,

To handle late events in Flink you would have to implement you own custom trigger.

To see a relatively more complex example of such a trigger and how to implement it,

Which implements the trigger described in this article (before the conclusions section)

Thanks,
Kostas

On Jun 3, 2016, at 2:55 PM, Igor Berman <[hidden email]> wrote:

Hi 

according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete

you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late

thanks in advance






Reply | Threaded
Open this post in threaded view
|

Re: Event processing time with lateness

Elias Levy
In reply to this post by Kostas Kloudas

On Fri, Jun 3, 2016 at 6:47 AM, Kostas Kloudas <[hidden email]> wrote:
To see a relatively more complex example of such a trigger and how to implement it,

I've modified this trigger so that firing are suppressed unless there are new events between timers.  This can significantly reduce the outputted events, which could mean much reduced writes to a downstream data store.  See https://gist.github.com/eliaslevy/ec840444607b9a5dd5aa3eb2cdd77932.

Also, I find the accumulating behavior somewhat unintuitive as when disabled it only purges when the time window ends.  When discarding is in effect, it seems more natural for purging it to occur at each firing, whether early, at the windows event time end, or late.  Otherwise, you may end up with output events of different semantics.  E.g. with the current behavior if you are implementing a counter early firing will result on partial counts until the window end, after that late will give you partial counts of the delta from the window end count.  It would be more consistent to either generate partial counts at all firing or deltas at all firing, so that the output of the operator can be processes the same downstream.