emit partial state in window (streaming)

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

emit partial state in window (streaming)

Luis Mariano Guerra
hi,

 I need to calculate some counts for the day but also emit the partial counts periodically, I think triggers may help me, I'm searching around but there's not much content about it, any tip?

for example I'm counting access by location to different services, I want to accumulate access during the whole day, but I want to emit the partial count every 5 minutes.

one slightly related question, is there a way to align a window to a day? for example a 24 hour window that starts at 00:00.

thanks.
Reply | Threaded
Open this post in threaded view
|

Re: emit partial state in window (streaming)

Fabian Hueske-2
Hi Luis,

these blogposts should help you with the periodic partial result trigger [1] [2].

Regarding the second question:
Time windows are by default aligned to 1970-01-01-00:00:00.
So a 24 hour window will always start at 00:00.

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
[2] https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink

2016-10-27 16:31 GMT+02:00 Luis Mariano Guerra <[hidden email]>:
hi,

 I need to calculate some counts for the day but also emit the partial counts periodically, I think triggers may help me, I'm searching around but there's not much content about it, any tip?

for example I'm counting access by location to different services, I want to accumulate access during the whole day, but I want to emit the partial count every 5 minutes.

one slightly related question, is there a way to align a window to a day? for example a 24 hour window that starts at 00:00.

thanks.

Reply | Threaded
Open this post in threaded view
|

Re: emit partial state in window (streaming)

Luis Mariano Guerra
On Thu, Oct 27, 2016 at 4:37 PM, Fabian Hueske <[hidden email]> wrote:
Hi Luis,

these blogposts should help you with the periodic partial result trigger [1] [2].

Hi, thanks for the links, I read them and tried to implement what I need, everything seems to work as expected except for the fact that the partial results aren't emitted, I created a gist with my PartialWindowTrigger implementation and the relevant part of the job:

https://gist.github.com/anonymous/041987821e37ee8f862ce1857bb074ea

Is the problem in the trigger?
do I have to create a window assigner too?
is it because of the windowAll?

I reproduce part of the readme from the gist here for convenience, please see the readme for the PartialWindowTrigger implementation and the rest of the logs:

What the job does (or I think it does) is to:

  • KeyBy the first string of a field
  • Create a tumbling window of 10 seconds
  • Register my PartialWindowTrigger that will trigger every 2 seconds (FIRE) and after the 10 second window (FIRE_AND_PURGE)
  • Fold on each partition to create a partial accumulation
  • Join all the partial results into a unique place through windowAll
  • Aggregate the partial aggregations into one result

Here is the job's relevant part:

            return input.keyBy(keySelector)
                .timeWindow(Time.of(windowTime, timeUnit))
                .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
                .apply(creator.create(), timeWindowFold, timeWindowMerge)
                .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
                //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
                .apply(creator.create(), windowAllFold, windowAllMerge);

The problem is that the triggers FIRE correctly but no partial results (every 2 seconds) are emitted, only the final result (every 10 seconds) is emitted.

Even if instead of returning FIRE on onElement, I do:

ctx.registerEventTimeTimer(timestamp);

and return FIRE or FIRE_AND_PURGE on onEvent it still doesn't emit the partial values.

There's a commented line on the job that registers a PartialWindowTrigger for the windowAll window but still doesn't work if uncommented.

I added println's on the trigger and on the job steps, this is the output:

2016-11-03T11:07:04.180+01:00 onElement FIRE
2016-11-03T11:07:04.232+01:00 multiCountWindowFn        1
2016-11-03T11:07:04.305+01:00 windowAllFold
2016-11-03T11:07:04.733+01:00 timeWindowFold
2016-11-03T11:07:04.681+01:00 onElement CONTINUE
2016-11-03T11:07:05.234+01:00 timeWindowFold
2016-11-03T11:07:05.182+01:00 onElement CONTINUE
2016-11-03T11:07:05.735+01:00 timeWindowFold
2016-11-03T11:07:05.682+01:00 onElement CONTINUE
2016-11-03T11:07:06.236+01:00 timeWindowFold
2016-11-03T11:07:06.183+01:00 onElement FIRE

<3 more blocks like the one above here> 2016-11-03T11:07:12.246+01:00 multiCountWindowFn 1 2016-11-03T11:07:12.317+01:00 windowAllFold 2016-11-03T11:07:12.746+01:00 timeWindowFold 2016-11-03T11:07:12.693+01:00 onElement CONTINUE 2016-11-03T11:07:13.247+01:00 timeWindowFold 2016-11-03T11:07:13.194+01:00 onElement CONTINUE 2016-11-03T11:07:13.748+01:00 timeWindowFold 2016-11-03T11:07:13.695+01:00 onElement CONTINUE 2016-11-03T11:07:09.999+01:00 onEventTime FIRE_AND_PURGE 2016-11-03T11:07:13.948+01:00 multiCountWindowFn 1 2016-11-03T11:07:14.020+01:00 windowAllFold 2016-11-03T11:07:14.020+01:00 allWindowMerger 1 {"blue":{"foo":{"v":65}},"$":{"ts":1478167634020}}


Regarding the second question:
Time windows are by default aligned to 1970-01-01-00:00:00.
So a 24 hour window will always start at 00:00.

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
[2] https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink

2016-10-27 16:31 GMT+02:00 Luis Mariano Guerra <[hidden email]>:
hi,

 I need to calculate some counts for the day but also emit the partial counts periodically, I think triggers may help me, I'm searching around but there's not much content about it, any tip?

for example I'm counting access by location to different services, I want to accumulate access during the whole day, but I want to emit the partial count every 5 minutes.

one slightly related question, is there a way to align a window to a day? for example a 24 hour window that starts at 00:00.

thanks.


Reply | Threaded
Open this post in threaded view
|

Re: emit partial state in window (streaming)

Kostas Kloudas
Hi Luis,

Can you try to comment the whole final windowing and see if this is works? 
This includes the following lines:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
  .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
  .apply(creator.create(), windowAllFold, windowAllMerge);

An additional note is that I would go for registering an event time timer at the onEventTime 
instead of checking the timestamp on the onElement(). This is because with your implementation,
in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive.

Cheers,
Kostas

On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <[hidden email]> wrote:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
                //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
                .apply(creator.create(), windowAllFold, windowAllMerge);

Reply | Threaded
Open this post in threaded view
|

Re: emit partial state in window (streaming)

Manu Zhang
Hi Luis,

You may try ContinuousEventTimeTrigger that continuously fire on given time interval instead of writing your own.
Note that we recently fixed a bug for this trigger so I think only the trunk version is working.

Cheers,
Manu

On Thu, Nov 3, 2016 at 9:07 PM Kostas Kloudas <[hidden email]> wrote:
Hi Luis,

Can you try to comment the whole final windowing and see if this is works? 
This includes the following lines:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))

  .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
  .apply(creator.create(), windowAllFold, windowAllMerge);

An additional note is that I would go for registering an event time timer at the onEventTime 
instead of checking the timestamp on the onElement(). This is because with your implementation,
in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive.

Cheers,
Kostas

On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <[hidden email]> wrote:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
                //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
                .apply(creator.create(), windowAllFold, windowAllMerge);

Reply | Threaded
Open this post in threaded view
|

Re: emit partial state in window (streaming)

Luis Mariano Guerra
In reply to this post by Kostas Kloudas
On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Luis,

Can you try to comment the whole final windowing and see if this is works? 
This includes the following lines:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
  .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
  .apply(creator.create(), windowAllFold, windowAllMerge);


commenting it emits on fire, how do I make the trigger "go thorough" the windowAll, or if not possible, how can I join the substreams in one stream and respect the trigger?
 
An additional note is that I would go for registering an event time timer at the onEventTime 
instead of checking the timestamp on the onElement(). This is because with your implementation,
in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive.

then I think I understood the purpose of registering the event time timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to register a timer to call onEventTime?
 

Cheers,
Kostas

On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <[hidden email]> wrote:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
                //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
                .apply(creator.create(), windowAllFold, windowAllMerge);


Reply | Threaded
Open this post in threaded view
|

Re: emit partial state in window (streaming)

Kostas Kloudas
Hi Luis,

You cannot have event-time early firings on both chained window operators.
The reason is that each early result from the first window operator will have a timestamp equal to window.maxTimestamp-1.
So in the second windowing operator, they will be buffered until the watermark signaling the end of the window arrives.

Now for the second point, I think that what you have understood is correct.
The "ctx.registerEventTimeTimer(window.getEnd())” registers a timer to call the onEventTime().

Cheers,
Kostas


> On Nov 3, 2016, at 3:29 PM, Luis Mariano Guerra <[hidden email]> wrote:
>
> On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <[hidden email]> wrote:
> Hi Luis,
>
> Can you try to comment the whole final windowing and see if this is works?
> This includes the following lines:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
> commenting it emits on fire, how do I make the trigger "go thorough" the windowAll, or if not possible, how can I join the substreams in one stream and respect the trigger?
>  
> An additional note is that I would go for registering an event time timer at the onEventTime
> instead of checking the timestamp on the onElement(). This is because with your implementation,
> in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive.
>
> then I think I understood the purpose of registering the event time timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to register a timer to call onEventTime?
>  
>
> Cheers,
> Kostas
>
>> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <[hidden email]> wrote:
>>
>>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>>                 //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
>>                 .apply(creator.create(), windowAllFold, windowAllMerge);
>
>

Reply | Threaded
Open this post in threaded view
|

Re: emit partial state in window (streaming)

Luis Mariano Guerra
On Thu, Nov 3, 2016 at 7:05 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Luis,

You cannot have event-time early firings on both chained window operators.
The reason is that each early result from the first window operator will have a timestamp equal to window.maxTimestamp-1.
So in the second windowing operator, they will be buffered until the watermark signaling the end of the window arrives.

so, how do I get windowAll and partial results? do I have to remove the partial calculations and do it all in one node/thread? is there another way?
 

Now for the second point, I think that what you have understood is correct.
The "ctx.registerEventTimeTimer(window.getEnd())” registers a timer to call the onEventTime().

then what else calls onEventTime? because if a register the event time timer inside of it something else calls it.

 

Cheers,
Kostas


> On Nov 3, 2016, at 3:29 PM, Luis Mariano Guerra <[hidden email]> wrote:
>
> On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <[hidden email]> wrote:
> Hi Luis,
>
> Can you try to comment the whole final windowing and see if this is works?
> This includes the following lines:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
> commenting it emits on fire, how do I make the trigger "go thorough" the windowAll, or if not possible, how can I join the substreams in one stream and respect the trigger?
>
> An additional note is that I would go for registering an event time timer at the onEventTime
> instead of checking the timestamp on the onElement(). This is because with your implementation,
> in order to fire a computation, you always have to wait for an element outside the partial window interval to arrive.
>
> then I think I understood the purpose of registering the event time timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to register a timer to call onEventTime?
>
>
> Cheers,
> Kostas
>
>> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <[hidden email]> wrote:
>>
>>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>>                 //.trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, timeUnit))
>>                 .apply(creator.create(), windowAllFold, windowAllMerge);
>
>