Hi all,
From the documentation: "The Trigger specifies when the function that comes after the window clause (e.g., sum , count ) is evaluated (“fires”)
for each window."So, basically, if I specify:
I think of implementing my own trigger that looks like CountTrigger, but that will fire also when the end of time window is reached (at the moment, it just returns Continue, instead of Fired). But maybe there's a better way ? Is there a reason why CountTrigger is implemented as it is implemented today, and not as I described above (5 seconds or 100 events reached, whichever comes first).
Thanks, Anwar. |
Hi,
a Trigger is an *additional* condition for intermediate (early) evaluation of the window. Thus, it is not "or-ed" to the basic window definition. If you want to have an or-ed window condition, you can customize it by specifying your own window definition. > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put your code here */ ); -Matthias On 11/26/2015 11:40 PM, Anwar Rizal wrote: > Hi all, > > From the documentation: > "The |Trigger| specifies when the function that comes after the window > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window." > > So, basically, if I specify: > > |keyedStream > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)) > .trigger(CountTrigger.of(100))| > > | > | > > |The execution of the window function is triggered when the count reaches 100 in the time window of 5 seconds. If you have a system that never reaches 100 in 5 seconds, basically you will never have the window fired.| > > | > | > > |My question is, what would be the best option to have behavior as follow:| > > |The execution of the window function is triggered when 5 seconds is reached or 100 events are received before 5 seconds.| > > > I think of implementing my own trigger that looks like CountTrigger, but that will fire also when the end of time window is reached (at the moment, it just returns Continue, instead of Fired). But maybe there's a better way ? > > Is there a reason why CountTrigger is implemented as it is implemented today, and not as I described above (5 seconds or 100 events reached, whichever comes first). > > > Thanks, > > Anwar. > |
Hi, a regular tumbling time window of 5 seconds gets all elements within that period of time (semantics of time varies for processing, ingestion, and event time modes) and triggers the execution after 5 seconds. If you define a custom trigger, the assignment policy remains the same, but the trigger condition is overwritten (it is NOT additional but replaces the default condition), i.e., in your implementation, it will only trigger when 100 elements arrived. In order to trigger also when the window time expires, you have to register a timer (processing time or event time timer) via the trigger context. NOTE: The window assigner will continue to assign elements to the window, even if the window was already evaluated. If you PURGE the window and an element arrives after that, a new window is created. To implement your trigger, you have to register a timer in the onEvent() method with: ctx.registerEventTimeTimer(window.getEnd) You can to that in every onEvent() call, because the timer is always overwritten. NOTE: you should use Flink’s keyed-state (access via triggerContext) if you want to keep state such as the current count. Hope this helps. Please let me know if you have further questions. Fabian
Hi, a Trigger is an *additional* condition for intermediate (early) evaluation of the window. Thus, it is not "or-ed" to the basic window definition. If you want to have an or-ed window condition, you can customize it by specifying your own window definition. > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put your code here */ ); -Matthias On 11/26/2015 11:40 PM, Anwar Rizal wrote: > Hi all, > > From the documentation: > "The |Trigger| specifies when the function that comes after the window > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window." > > So, basically, if I specify: > > |keyedStream > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)) > .trigger(CountTrigger.of(100))| > > | > | > > |The execution of the window function is triggered when the count reaches 100 in the time window of 5 seconds. If you have a system that never reaches 100 in 5 seconds, basically you will never have the window fired.| > > | > | > > |My question is, what would be the best option to have behavior as follow:| > > |The execution of the window function is triggered when 5 seconds is reached or 100 events are received before 5 seconds.| > > > I think of implementing my own trigger that looks like CountTrigger, but that will fire also when the end of time window is reached (at the moment, it just returns Continue, instead of Fired). But maybe there's a better way ? > > Is there a reason why CountTrigger is implemented as it is implemented today, and not as I described above (5 seconds or 100 events reached, whichever comes first). > > > Thanks, > > Anwar. > |
Hi Anwar,
what Fabian wrote is completely right. I just want to give the reasoning for why the CountTrigger behaves as it does. The idea was to have Triggers that clearly focus on one thing and then at some point add combination triggers. For example, an OrTrigger that triggers if either of it’s sub triggers triggers, or an AndTrigger that triggers after both its sub triggers fire. (There is also more complex stuff that could be thought of here.) Cheers, Aljoscha > On 27 Nov 2015, at 09:59, [hidden email] wrote: > > > Hi, > > a regular tumbling time window of 5 seconds gets all elements within that period of time (semantics of time varies for processing, ingestion, and event time modes) and triggers the execution after 5 seconds. > > If you define a custom trigger, the assignment policy remains the same, but the trigger condition is overwritten (it is NOT additional but replaces the default condition), i.e., in your implementation, it will only trigger when 100 elements arrived. In order to trigger also when the window time expires, you have to register a timer (processing time or event time timer) via the trigger context. > NOTE: The window assigner will continue to assign elements to the window, even if the window was already evaluated. If you PURGE the window and an element arrives after that, a new window is created. > > To implement your trigger, you have to register a timer in the onEvent() method with: > ctx.registerEventTimeTimer(window.getEnd) > You can to that in every onEvent() call, because the timer is always overwritten. > > NOTE: you should use Flink’s keyed-state (access via triggerContext) if you want to keep state such as the current count. > > Hope this helps. Please let me know if you have further questions. > Fabian > > > > > From: Matthias J. Sax > Sent: Friday, November 27, 2015 08:44 > To: [hidden email] > Subject: Re: Doubt about window and count trigger > > > Hi, > > a Trigger is an *additional* condition for intermediate (early) > evaluation of the window. Thus, it is not "or-ed" to the basic window > definition. > > If you want to have an or-ed window condition, you can customize it by > specifying your own window definition. > > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put your code here */ ); > > -Matthias > > > On 11/26/2015 11:40 PM, Anwar Rizal wrote: > > Hi all, > > > > From the documentation: > > "The |Trigger| specifies when the function that comes after the window > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window." > > > > So, basically, if I specify: > > > > |keyedStream > > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)) > > .trigger(CountTrigger.of(100))| > > > > | > > | > > > > |The execution of the window function is triggered when the count reaches 100 in the time window of 5 seconds. If you have a system that never reaches 100 in 5 seconds, basically you will never have the window fired.| > > > > | > > | > > > > |My question is, what would be the best option to have behavior as follow:| > > > > |The execution of the window function is triggered when 5 seconds is reached or 100 events are received before 5 seconds.| > > > > > > I think of implementing my own trigger that looks like CountTrigger, but that will fire also when the end of time window is reached (at the moment, it just returns Continue, instead of Fired). But maybe there's a better way ? > > > > Is there a reason why CountTrigger is implemented as it is implemented today, and not as I described above (5 seconds or 100 events reached, whichever comes first). > > > > > > Thanks, > > > > Anwar. > > |
Thanks Fabian and Aljoscha, I try to implement the trigger as you described as follow: It works fine , indeed. Thanks, Anwar On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek <[hidden email]> wrote: Hi Anwar, |
Hi Anwar, You trigger looks good! I just want to make sure you know what it is exactly happening after a window was evaluated and purged. Once a window was purged, the whole window is cleared and removed. If a new element arrives, that would have fit into the purged window, a new window with exactly the same time boundaries is created, i.e., if you have a 5 min time window, that is fired and purged in minute 4 and a new element arrived immediately after the purging, it is put into a window, that will only "exist" for 1 more minute (and not starting a new 5 minute window). Cheers, Fabian 2015-11-27 14:59 GMT+01:00 Anwar Rizal <[hidden email]>:
|
Thanks Fabian, Just for completion. In that 1 min window, is my modified count trigger still valid ? Say, if in that one minute window, I have 100 events after 30 s, it will still fire at 30th second ? Cheers, anwar. On Fri, Nov 27, 2015 at 3:31 PM, Fabian Hueske <[hidden email]> wrote:
|
When a window is purged, the Trigger and its state are also cleared. A new window comes with a new Trigger (and a new state). So yes, in your example the window will be fired after 30 secs again. 2015-11-27 16:01 GMT+01:00 Anwar Rizal <[hidden email]>:
|
Hi! Otherwise, memory consumption would just grow indefinitely, holding state of old triggers. Greetings, Stephan On Fri, Nov 27, 2015 at 4:05 PM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |