Emit intermediate accumulator state of a session window

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Emit intermediate accumulator state of a session window

chandu soa

Emit intermediate accumulator(AggregateFunction ACC value) state of a session window when new event arrives

 

AggregateFunction#getResults() is called only when window completes. My need is emit intermediate accumulator values(result of AggregateFunction#add()) as well and write them to Sink. Both AggregateFunction#getResult() and ProcessWindowFunction() provides aggregated result, only when the window is closed.

Any thoughts please, how to emit or stream intermediate accumulator state as soon as new event arrive when window is open? Need to implement custom trigger or Assigner?

 

To give you some background, when user watches a video we get events - when clicked, thereafter every ~ 15minutes, and finally when user close the video.

I need to aggregate them as soon as they arrive and post it to destination. For example, if user watching a two-hour movie I get events for 15 min interval(0,15,30,...,120), whenever I get a event need to aggregate watched percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below implementation emitting(getResult()) a single event 20 minutes after watching a video.

 

 

.window(EventTimeSessionWindows.withGap(Time.minutes(20)))

                                                                .aggregate(new EventAggregator())

                                                                .filter(new FinalFilter())

                                                                .addSink(...)


Appreciate your help.


Thanks,

chandu

Reply | Threaded
Open this post in threaded view
|

Re: Emit intermediate accumulator state of a session window

Andrey Zagrebin-5
Hi Chandu,

I am not sure whether using the windowing API is helpful in this case at all.

At least, you could try to consume the data not only by windowing but also by a custom stateful function.
You look into the AggregatingState [1]. Then you could do whatever you want with the current aggregated value.
If you still need to do something with the result of windowing, you could do it as now or simulate it with timers [2] in that same stateful function.

Best,
Andrey


On Tue, Dec 3, 2019 at 12:21 AM chandu soa <[hidden email]> wrote:

Emit intermediate accumulator(AggregateFunction ACC value) state of a session window when new event arrives

 

AggregateFunction#getResults() is called only when window completes. My need is emit intermediate accumulator values(result of AggregateFunction#add()) as well and write them to Sink. Both AggregateFunction#getResult() and ProcessWindowFunction() provides aggregated result, only when the window is closed.

Any thoughts please, how to emit or stream intermediate accumulator state as soon as new event arrive when window is open? Need to implement custom trigger or Assigner?

 

To give you some background, when user watches a video we get events - when clicked, thereafter every ~ 15minutes, and finally when user close the video.

I need to aggregate them as soon as they arrive and post it to destination. For example, if user watching a two-hour movie I get events for 15 min interval(0,15,30,...,120), whenever I get a event need to aggregate watched percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below implementation emitting(getResult()) a single event 20 minutes after watching a video.

 

 

.window(EventTimeSessionWindows.withGap(Time.minutes(20)))

                                                                .aggregate(new EventAggregator())

                                                                .filter(new FinalFilter())

                                                                .addSink(...)


Appreciate your help.


Thanks,

chandu

Reply | Threaded
Open this post in threaded view
|

Re: Emit intermediate accumulator state of a session window

Rafi Aroch
Hi Chandu,

Maybe you can use a custom trigger: 
     .trigger(ContinuousEventTimeTrigger.of(Time.minutes(15)))

This would continuously trigger your aggregate every period of time.

Thanks,
Rafi


On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Chandu,

I am not sure whether using the windowing API is helpful in this case at all.

At least, you could try to consume the data not only by windowing but also by a custom stateful function.
You look into the AggregatingState [1]. Then you could do whatever you want with the current aggregated value.
If you still need to do something with the result of windowing, you could do it as now or simulate it with timers [2] in that same stateful function.

Best,
Andrey


On Tue, Dec 3, 2019 at 12:21 AM chandu soa <[hidden email]> wrote:

Emit intermediate accumulator(AggregateFunction ACC value) state of a session window when new event arrives

 

AggregateFunction#getResults() is called only when window completes. My need is emit intermediate accumulator values(result of AggregateFunction#add()) as well and write them to Sink. Both AggregateFunction#getResult() and ProcessWindowFunction() provides aggregated result, only when the window is closed.

Any thoughts please, how to emit or stream intermediate accumulator state as soon as new event arrive when window is open? Need to implement custom trigger or Assigner?

 

To give you some background, when user watches a video we get events - when clicked, thereafter every ~ 15minutes, and finally when user close the video.

I need to aggregate them as soon as they arrive and post it to destination. For example, if user watching a two-hour movie I get events for 15 min interval(0,15,30,...,120), whenever I get a event need to aggregate watched percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below implementation emitting(getResult()) a single event 20 minutes after watching a video.

 

 

.window(EventTimeSessionWindows.withGap(Time.minutes(20)))

                                                                .aggregate(new EventAggregator())

                                                                .filter(new FinalFilter())

                                                                .addSink(...)


Appreciate your help.


Thanks,

chandu

Reply | Threaded
Open this post in threaded view
|

Re: Emit intermediate accumulator state of a session window

chandu soa
Thank you all for your responses.

I've created a custom trigger similar to flink provided EventTimeTrigger, with few changes. Fire event on onElement(), and do not fire event on onEventTime() to satisfy my requirement - whenever new event arrives fire incremental result(result of AggregateFunction#add()) immediately. Find below changed code block. 

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.FIRE; // instead of CONTINUE
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.CONTINUE : // instead of FIRE
TriggerResult.CONTINUE;
}

Thanks,
Chandu


On Sun, Dec 8, 2019 at 8:22 AM Rafi Aroch <[hidden email]> wrote:
Hi Chandu,

Maybe you can use a custom trigger: 
     .trigger(ContinuousEventTimeTrigger.of(Time.minutes(15)))

This would continuously trigger your aggregate every period of time.

Thanks,
Rafi


On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Chandu,

I am not sure whether using the windowing API is helpful in this case at all.

At least, you could try to consume the data not only by windowing but also by a custom stateful function.
You look into the AggregatingState [1]. Then you could do whatever you want with the current aggregated value.
If you still need to do something with the result of windowing, you could do it as now or simulate it with timers [2] in that same stateful function.

Best,
Andrey


On Tue, Dec 3, 2019 at 12:21 AM chandu soa <[hidden email]> wrote:

Emit intermediate accumulator(AggregateFunction ACC value) state of a session window when new event arrives

 

AggregateFunction#getResults() is called only when window completes. My need is emit intermediate accumulator values(result of AggregateFunction#add()) as well and write them to Sink. Both AggregateFunction#getResult() and ProcessWindowFunction() provides aggregated result, only when the window is closed.

Any thoughts please, how to emit or stream intermediate accumulator state as soon as new event arrive when window is open? Need to implement custom trigger or Assigner?

 

To give you some background, when user watches a video we get events - when clicked, thereafter every ~ 15minutes, and finally when user close the video.

I need to aggregate them as soon as they arrive and post it to destination. For example, if user watching a two-hour movie I get events for 15 min interval(0,15,30,...,120), whenever I get a event need to aggregate watched percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below implementation emitting(getResult()) a single event 20 minutes after watching a video.

 

 

.window(EventTimeSessionWindows.withGap(Time.minutes(20)))

                                                                .aggregate(new EventAggregator())

                                                                .filter(new FinalFilter())

                                                                .addSink(...)


Appreciate your help.


Thanks,

chandu