CountTrigger FIRE or FIRE_AND_PURGE

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

CountTrigger FIRE or FIRE_AND_PURGE

Paul Joireman

Hi all,


I'm attempting to use long SlidingEventTime window (duration 24 hours) but I would like updates more frequently than the 24 hour length.  I naeively attempted to use a simple CountTrigger(10) to give me the window every time 10 samples are collected, however, the window processing function I'm using only seems to get the latest 10 not the whole window (which I what I was hoping for).   The code looks like it simply fires after the count is reached but it seems like it is doing a FIRE and PURGE, I cant' seem to use the iterator in the window processing function to get more than 10 elements at a time.  Is there something I'm missing in order to get at the full content of the window data.   


Paul

Reply | Threaded
Open this post in threaded view
|

Re: CountTrigger FIRE or FIRE_AND_PURGE

Fabian Hueske-2
Hi Paul,

This blog post [1] includes an example of an early trigger that should pretty much do what you are looking for.
This one [2] explains the windowing mechanics of Flink (window assigner, trigger, function, etc).

Hope this helps,
Fabian

2016-08-30 0:25 GMT+02:00 Paul Joireman <[hidden email]>:

Hi all,


I'm attempting to use long SlidingEventTime window (duration 24 hours) but I would like updates more frequently than the 24 hour length.  I naeively attempted to use a simple CountTrigger(10) to give me the window every time 10 samples are collected, however, the window processing function I'm using only seems to get the latest 10 not the whole window (which I what I was hoping for).   The code looks like it simply fires after the count is reached but it seems like it is doing a FIRE and PURGE, I cant' seem to use the iterator in the window processing function to get more than 10 elements at a time.  Is there something I'm missing in order to get at the full content of the window data.   


Paul


Reply | Threaded
Open this post in threaded view
|

Re: CountTrigger FIRE or FIRE_AND_PURGE

Paul Joireman

Fabian,


Thanks for the reference, I think I was incorrectly interpreting the results I was getting using the CountTrigger, it looks like it does keep the data.   However, I'm running into some unexpected (at least by me) behavior.   Given a keyed data stream keyedStream and event timing


    final DataStream<MyMessageOut> alertingMsgs = keyedStream
                .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
                .trigger(CountTrigger.of(1))
                .apply(new MyWindowProcessor());

Every time a new element comes in I expected (probably naeively) one firing of the window but I get 5, presumably due to the sliding windows, although this probably depends on the Timestamp extraction "policy", I used a BoundedOutOfOrdernessTimestampExtractor(Time.minute(1)).    Is there a way in the window processing function to determine which particular sliding window you are processing?

Alternatively, a TumblingEventTimeWindow as below only fires once, but with the default trigger replaced by CountTrigger, my understanding is that the previous windows will not purge, is that correct?

    final DataStream<MyMessageOut> alertingMsgs = keyedStream
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(CountTrigger.of(1))
                .apply(new MyWindowProcessor());

Paul


From: Fabian Hueske <[hidden email]>
Sent: Monday, August 29, 2016 5:46:06 PM
To: [hidden email]
Subject: Re: CountTrigger FIRE or FIRE_AND_PURGE
 
Hi Paul,

This blog post [1] includes an example of an early trigger that should pretty much do what you are looking for.
This one [2] explains the windowing mechanics of Flink (window assigner, trigger, function, etc).

Hope this helps,
Fabian

2016-08-30 0:25 GMT+02:00 Paul Joireman <[hidden email]>:

Hi all,


I'm attempting to use long SlidingEventTime window (duration 24 hours) but I would like updates more frequently than the 24 hour length.  I naeively attempted to use a simple CountTrigger(10) to give me the window every time 10 samples are collected, however, the window processing function I'm using only seems to get the latest 10 not the whole window (which I what I was hoping for).   The code looks like it simply fires after the count is reached but it seems like it is doing a FIRE and PURGE, I cant' seem to use the iterator in the window processing function to get more than 10 elements at a time.  Is there something I'm missing in order to get at the full content of the window data.   


Paul


Reply | Threaded
Open this post in threaded view
|

Re: CountTrigger FIRE or FIRE_AND_PURGE

Fabian Hueske-2
Hi Paul,

sorry for the delayed reply.

I think a CountTrigger won't give you the expected result. When you call trigger() you replace! the existing trigger. In case of a Sliding/TumblingEventTimeWindow, the trigger that fires at the end of the window is replaced by a trigger that fires every 10 element. So your window function will not be called after 24h.
You need to implement a custom trigger, similar to the one in the blog post. I think you only need to modify the example code such that it does not sum an attribute (passengerCount) but rather counts how often onElement() has been invoked.

Regarding the CountTrigger that fires several times per element on the Sliding and only once on Tumbling windows. In case of a tumbling window, each record is inserted into exactly one window. In case of sliding windows, each element is inserted into multiple windows (should be six for a SlidingWindow(1 minute, 10 seconds)). The CountTrigger fires each window individually.

When using a time window, the WindowFunction has two parameters that identify the window: 1) the key and 2) the Window object. In case of a time window, the Window object is a TimeWindow that provides start and end timestamps.

Another point to consider (esp. for long-running windows as in your case) is incremental aggregation of window elements [1]. By providing a FoldFunction (or ReduceFunction), the function is applied for each arriving element to eagerly aggregate the elements. This means, that the window only holds the aggregated value and not each individual window element. Hence, the memory / storage food print is much better and the window aggregate does not have to be computed for each early firing. When an element arrives 1) the element is aggregated with the incremental aggregation function 2) the trigger is evaluated 3) if the trigger fires, the window function is called.

Hope this helps,
 

2016-08-30 17:59 GMT+02:00 Paul Joireman <[hidden email]>:

Fabian,


Thanks for the reference, I think I was incorrectly interpreting the results I was getting using the CountTrigger, it looks like it does keep the data.   However, I'm running into some unexpected (at least by me) behavior.   Given a keyed data stream keyedStream and event timing


    final DataStream<MyMessageOut> alertingMsgs = keyedStream
                .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
                .trigger(CountTrigger.of(1))
                .apply(new MyWindowProcessor());

Every time a new element comes in I expected (probably naeively) one firing of the window but I get 5, presumably due to the sliding windows, although this probably depends on the Timestamp extraction "policy", I used a BoundedOutOfOrdernessTimestampExtractor(Time.minute(1)).    Is there a way in the window processing function to determine which particular sliding window you are processing?

Alternatively, a TumblingEventTimeWindow as below only fires once, but with the default trigger replaced by CountTrigger, my understanding is that the previous windows will not purge, is that correct?

    final DataStream<MyMessageOut> alertingMsgs = keyedStream
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(CountTrigger.of(1))
                .apply(new MyWindowProcessor());

Paul


From: Fabian Hueske <[hidden email]>
Sent: Monday, August 29, 2016 5:46:06 PM
To: [hidden email]
Subject: Re: CountTrigger FIRE or FIRE_AND_PURGE
 
Hi Paul,

This blog post [1] includes an example of an early trigger that should pretty much do what you are looking for.
This one [2] explains the windowing mechanics of Flink (window assigner, trigger, function, etc).

Hope this helps,
Fabian

2016-08-30 0:25 GMT+02:00 Paul Joireman <[hidden email]>:

Hi all,


I'm attempting to use long SlidingEventTime window (duration 24 hours) but I would like updates more frequently than the 24 hour length.  I naeively attempted to use a simple CountTrigger(10) to give me the window every time 10 samples are collected, however, the window processing function I'm using only seems to get the latest 10 not the whole window (which I what I was hoping for).   The code looks like it simply fires after the count is reached but it seems like it is doing a FIRE and PURGE, I cant' seem to use the iterator in the window processing function to get more than 10 elements at a time.  Is there something I'm missing in order to get at the full content of the window data.   


Paul