Emit intermediate accumulator(AggregateFunction ACC value) state of a session window when new event arrives
AggregateFunction#getResults() is called only when window completes. My need is emit intermediate accumulator values(result of AggregateFunction#add()) as well and write them to Sink. Both AggregateFunction#getResult() and ProcessWindowFunction() provides aggregated result, only when the window is closed. Any thoughts please, how to emit or stream intermediate accumulator state as soon as new event arrive when window is open? Need to implement custom trigger or Assigner?
To give you some background, when user watches a video we get events - when clicked, thereafter every ~ 15minutes, and finally when user close the video. I need to aggregate them as soon as they arrive and post it to destination. For example, if user watching a two-hour movie I get events for 15 min interval(0,15,30,...,120), whenever I get a event need to aggregate watched percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below implementation emitting(getResult()) a single event 20 minutes after watching a video.
.window(EventTimeSessionWindows.withGap(Time.minutes(20))) .aggregate(new EventAggregator()) .filter(new FinalFilter()) .addSink(...) Appreciate your help. Thanks, chandu |
Hi Chandu, I am not sure whether using the windowing API is helpful in this case at all. At least, you could try to consume the data not only by windowing but also by a custom stateful function. You look into the AggregatingState [1]. Then you could do whatever you want with the current aggregated value. If you still need to do something with the result of windowing, you could do it as now or simulate it with timers [2] in that same stateful function. Best, Andrey On Tue, Dec 3, 2019 at 12:21 AM chandu soa <[hidden email]> wrote:
|
Hi Chandu, Maybe you can use a custom trigger: .trigger(ContinuousEventTimeTrigger.of(Time.minutes(15))) This would continuously trigger your aggregate every period of time. Thanks, Rafi On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin <[hidden email]> wrote:
|
Thank you all for your responses. I've created a custom trigger similar to flink provided EventTimeTrigger, with few changes. Fire event on onElement(), and do not fire event on onEventTime() to satisfy my requirement - whenever new event arrives fire incremental result(result of AggregateFunction#add()) immediately. Find below changed code block. @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.FIRE; // instead of CONTINUE } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.CONTINUE : // instead of FIRE TriggerResult.CONTINUE; } Thanks, Chandu On Sun, Dec 8, 2019 at 8:22 AM Rafi Aroch <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |