Hello!
I'm trying to figure out how to implement a window that will emit events at regular intervals or when a specific event is encountered. A bit of background. I have a stream of events from devices that will send events to our system whenever a user watches a video. These events include a unique id (sessionId) shared by all events of the same same session so I want to key my stream on this. After that I want to aggregate all the events into a session summary and this summary I want to emit every 5 minutes however I still want to keep the summary in the window (in case more events for that session arrives). However if I were to receive an end event (sent by the device once a user stops watching the video) I want to emit the summary and remove it from the window. Is it possible to do this with one of the existing windows together with a trigger or in some other way? Been trying to figure it out by reading the docs but haven't gotten any wiser so turning to the mailing list for help. Best regards, Tim |
Hi Tim, I think you could use Flink's trigger API [1] to implement a trigger which fires when it sees a certain event or after some time. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers Cheers, Till On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[hidden email]> wrote:
|
Thanks for the suggestions! I'll see if I can implement something that works! A followup question, more related to state. If I implement either the custom trigger with or the process function, how will they handle crashes and such. So if I for instance have a checkpointing interval of 10s will the job recover from the last checkpoint with all the summaries as they were at that point. Or do I have to implement specific ValueStates in both cases? On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <[hidden email]> wrote:
|
If you use the Trigger API, then you don't have to do anything special for fault tolerance. When using the ProcessFunction, then you should use Flink's state primitives to store your state (e.g. ValueState). This will automatically checkpoint the state. In case of a failure Flink will always resume from the latest successfully completed checkpoint. Cheers, Till On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <[hidden email]> wrote:
|
Thanks! I've managed to implement a working solution with the trigger API, but I'm not exactly sure why it works. I'm doing the following: DataStream<SessionSummary> summaries = env With the following trigger (omitting parts of the trigger): [ ... ] @Override So what I'm doing is setting the ctx.registerProcessingTimeTimer(window.maxTimestamp()); however I only set this once at the first event. But when testing it does work as I want and fires every ten seconds and the fires and purges only after no events have been received for 2 minutes (as specified in the SessionWindow). Is the processingTimeTimer being updated every time the window end time is increased (I noticed this happens in the background by Flink every time a new event arrives)? I'm happy with my solution, just trying to figure out how things work! Cheers, Tim On Thu, 29 Apr 2021 at 18:42, Till Rohrmann <[hidden email]> wrote:
|
Hi Tim, The way session windows work is by first creating a new window for every incoming event and then merging overlapping windows. That's why you see that the end time of a window increases with every new incoming event. I hope this explains what you are seeing. Apart from that, I think the SessionTrigger looks good to me. Cheers, Till On Fri, Apr 30, 2021 at 9:27 AM Tim Josefsson <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |