hi, I need to calculate some counts for the day but also emit the partial counts periodically, I think triggers may help me, I'm searching around but there's not much content about it, any tip? |
Hi Luis, these blogposts should help you with the periodic partial result trigger [1] [2].Time windows are by default aligned to 1970-01-01-00:00:00. So a 24 hour window will always start at 00:00. [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html [2] https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink 2016-10-27 16:31 GMT+02:00 Luis Mariano Guerra <[hidden email]>:
|
On Thu, Oct 27, 2016 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Hi, thanks for the links, I read them and tried to implement what I need, everything seems to work as expected except for the fact that the partial results aren't emitted, I created a gist with my PartialWindowTrigger implementation and the relevant part of the job: https://gist.github.com/anonymous/041987821e37ee8f862ce1857bb074ea Is the problem in the trigger? do I have to create a window assigner too? is it because of the windowAll? I reproduce part of the readme from the gist here for convenience, please see the readme for the PartialWindowTrigger implementation and the rest of the logs: What the job does (or I think it does) is to:
Here is the job's relevant part: return input.keyBy(keySelector) The problem is that the triggers FIRE correctly but no partial results (every 2 seconds) are emitted, only the final result (every 10 seconds) is emitted. Even if instead of returning FIRE on onElement, I do: ctx.registerEventTimeTimer(timestamp); and return FIRE or FIRE_AND_PURGE on onEvent it still doesn't emit the partial values. There's a commented line on the job that registers a PartialWindowTrigger for the windowAll window but still doesn't work if uncommented. I added println's on the trigger and on the job steps, this is the output: 2016-11-03T11:07:04.180+01:00 onElement FIRE 2016-11-03T11:07:04.232+01:00 multiCountWindowFn 1 2016-11-03T11:07:04.305+01:00 windowAllFold 2016-11-03T11:07:04.733+01:00 timeWindowFold 2016-11-03T11:07:04.681+01:00 onElement CONTINUE 2016-11-03T11:07:05.234+01:00 timeWindowFold 2016-11-03T11:07:05.182+01:00 onElement CONTINUE 2016-11-03T11:07:05.735+01:00 timeWindowFold 2016-11-03T11:07:05.682+01:00 onElement CONTINUE 2016-11-03T11:07:06.236+01:00 timeWindowFold 2016-11-03T11:07:06.183+01:00 onElement FIRE
|
Hi Luis,
Can you try to comment the whole final windowing and see if this is works? This includes the following lines: .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit))) .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit)) .apply(creator.create(), windowAllFold, windowAllMerge); An additional note is that I would go for registering an event time timer at the onEventTime instead of checking the timestamp on the onElement(). This is because with your implementation, in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive. Cheers, Kostas
|
Hi Luis, You may try ContinuousEventTimeTrigger that continuously fire on given time interval instead of writing your own. Note that we recently fixed a bug for this trigger so I think only the trunk version is working. Cheers, Manu On Thu, Nov 3, 2016 at 9:07 PM Kostas Kloudas <[hidden email]> wrote:
|
In reply to this post by Kostas Kloudas
On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <[hidden email]> wrote:
commenting it emits on fire, how do I make the trigger "go thorough" the windowAll, or if not possible, how can I join the substreams in one stream and respect the trigger?
then I think I understood the purpose of registering the event time timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to register a timer to call onEventTime?
|
Hi Luis,
You cannot have event-time early firings on both chained window operators. The reason is that each early result from the first window operator will have a timestamp equal to window.maxTimestamp-1. So in the second windowing operator, they will be buffered until the watermark signaling the end of the window arrives. Now for the second point, I think that what you have understood is correct. The "ctx.registerEventTimeTimer(window.getEnd())” registers a timer to call the onEventTime(). Cheers, Kostas > On Nov 3, 2016, at 3:29 PM, Luis Mariano Guerra <[hidden email]> wrote: > > On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <[hidden email]> wrote: > Hi Luis, > > Can you try to comment the whole final windowing and see if this is works? > This includes the following lines: > > .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit))) > .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit)) > .apply(creator.create(), windowAllFold, windowAllMerge); > > > commenting it emits on fire, how do I make the trigger "go thorough" the windowAll, or if not possible, how can I join the substreams in one stream and respect the trigger? > > An additional note is that I would go for registering an event time timer at the onEventTime > instead of checking the timestamp on the onElement(). This is because with your implementation, > in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive. > > then I think I understood the purpose of registering the event time timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to register a timer to call onEventTime? > > > Cheers, > Kostas > >> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <[hidden email]> wrote: >> >> .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit))) >> //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit)) >> .apply(creator.create(), windowAllFold, windowAllMerge); > > |
On Thu, Nov 3, 2016 at 7:05 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Luis, so, how do I get windowAll and partial results? do I have to remove the partial calculations and do it all in one node/thread? is there another way?
then what else calls onEventTime? because if a register the event time timer inside of it something else calls it.
|
Free forum by Nabble | Edit this page |