How to implement a window that emits events at regular intervals and on specific events

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to implement a window that emits events at regular intervals and on specific events

Tim Josefsson
Hello!

I'm trying to figure out how to implement a window that will emit events at regular intervals or when a specific event is encountered.

A bit of background. I have a stream of events from devices that will send events to our system whenever a user watches a video. These events include a unique id (sessionId) shared by all events of the same same session so I want to key my stream on this. After that I want to aggregate all the events into a session summary and this summary I want to emit every 5 minutes however I still want to keep the summary in the window (in case more events for that session arrives). However if I were to receive an end event (sent by the device once a user stops watching the video) I want to emit the summary and remove it from the window. 

Is it possible to do this with one of the existing windows together with a trigger or in some other way? Been trying to figure it out by reading the docs but haven't gotten any wiser so turning to the mailing list for help.

Best regards,
Tim
Reply | Threaded
Open this post in threaded view
|

Re: How to implement a window that emits events at regular intervals and on specific events

Till Rohrmann
Hi Tim,

I think you could use Flink's trigger API [1] to implement a trigger which fires when it sees a certain event or after some time.


Cheers,
Till

On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[hidden email]> wrote:
Hello!

I'm trying to figure out how to implement a window that will emit events at regular intervals or when a specific event is encountered.

A bit of background. I have a stream of events from devices that will send events to our system whenever a user watches a video. These events include a unique id (sessionId) shared by all events of the same same session so I want to key my stream on this. After that I want to aggregate all the events into a session summary and this summary I want to emit every 5 minutes however I still want to keep the summary in the window (in case more events for that session arrives). However if I were to receive an end event (sent by the device once a user stops watching the video) I want to emit the summary and remove it from the window. 

Is it possible to do this with one of the existing windows together with a trigger or in some other way? Been trying to figure it out by reading the docs but haven't gotten any wiser so turning to the mailing list for help.

Best regards,
Tim
Reply | Threaded
Open this post in threaded view
|

Re: How to implement a window that emits events at regular intervals and on specific events

Tim Josefsson
Thanks for the suggestions! I'll see if I can implement something that works!
A followup question, more related to state. If I implement either the custom trigger with or the process function, how will they handle crashes and such. So if I for instance have a checkpointing interval of 10s will the job recover from the last checkpoint with all the summaries as they were at that point. Or do I have to implement specific ValueStates in both cases?

On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <[hidden email]> wrote:
Hi Tim,

I think you could use Flink's trigger API [1] to implement a trigger which fires when it sees a certain event or after some time.


Cheers,
Till

On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[hidden email]> wrote:
Hello!

I'm trying to figure out how to implement a window that will emit events at regular intervals or when a specific event is encountered.

A bit of background. I have a stream of events from devices that will send events to our system whenever a user watches a video. These events include a unique id (sessionId) shared by all events of the same same session so I want to key my stream on this. After that I want to aggregate all the events into a session summary and this summary I want to emit every 5 minutes however I still want to keep the summary in the window (in case more events for that session arrives). However if I were to receive an end event (sent by the device once a user stops watching the video) I want to emit the summary and remove it from the window. 

Is it possible to do this with one of the existing windows together with a trigger or in some other way? Been trying to figure it out by reading the docs but haven't gotten any wiser so turning to the mailing list for help.

Best regards,
Tim


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook
Reply | Threaded
Open this post in threaded view
|

Re: How to implement a window that emits events at regular intervals and on specific events

Till Rohrmann
If you use the Trigger API, then you don't have to do anything special for fault tolerance. When using the ProcessFunction, then you should use Flink's state primitives to store your state (e.g. ValueState). This will automatically checkpoint the state. In case of a failure Flink will always resume from the latest successfully completed checkpoint.

Cheers,
Till

On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <[hidden email]> wrote:
Thanks for the suggestions! I'll see if I can implement something that works!
A followup question, more related to state. If I implement either the custom trigger with or the process function, how will they handle crashes and such. So if I for instance have a checkpointing interval of 10s will the job recover from the last checkpoint with all the summaries as they were at that point. Or do I have to implement specific ValueStates in both cases?

On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <[hidden email]> wrote:
Hi Tim,

I think you could use Flink's trigger API [1] to implement a trigger which fires when it sees a certain event or after some time.


Cheers,
Till

On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[hidden email]> wrote:
Hello!

I'm trying to figure out how to implement a window that will emit events at regular intervals or when a specific event is encountered.

A bit of background. I have a stream of events from devices that will send events to our system whenever a user watches a video. These events include a unique id (sessionId) shared by all events of the same same session so I want to key my stream on this. After that I want to aggregate all the events into a session summary and this summary I want to emit every 5 minutes however I still want to keep the summary in the window (in case more events for that session arrives). However if I were to receive an end event (sent by the device once a user stops watching the video) I want to emit the summary and remove it from the window. 

Is it possible to do this with one of the existing windows together with a trigger or in some other way? Been trying to figure it out by reading the docs but haven't gotten any wiser so turning to the mailing list for help.

Best regards,
Tim


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook
Reply | Threaded
Open this post in threaded view
|

Re: How to implement a window that emits events at regular intervals and on specific events

Tim Josefsson
Thanks! I've managed to implement a working solution with the trigger API, but I'm not exactly sure why it works.
I'm doing the following:
DataStream<SessionSummary> summaries = env
.addSource(kafkaConsumer, "playerEvents(Kafka)")
.name("EP - Read player events from Kafka")
.uid("EP - Read player events from Kafka")
.map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("EP - Map Json to HashMap")
.uid("EP - Map Json to HashMap")
.filter((FilterFunction<HashMap>) event -> !(event.get(Field.SESSION_ID) == null))
.name("EP - Remove any events without sessionId since they shouldn't generate sessions.")
.uid("EP - Remove any events without sessionId since they shouldn't generate sessions.")
.filter((FilterFunction<HashMap>) event -> event.get(Field.ACCOUNT_ID))
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(2)))
.trigger(new SessionTrigger())
.aggregate(new SummaryAggregator())
.name("EP - Aggregate events into session summaries")
.uid("EP - Aggregate events into session summaries");

summaries.print();
With the following trigger (omitting parts of the trigger):
[ ... ]
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));

// If an end event is detected, emit the content and purge
if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}

if (firstSeen.value() == null) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 10000L);
ctx.registerProcessingTimeTimer(window.maxTimestamp());
firstSeen.update(true);
}
logger.info("Current window end is {} for session {}", window.maxTimestamp(), element.get(Field.SESSION_ID));
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
// Emit the current result every time the processing time trigger fires
if (time == window.maxTimestamp()) {
return TriggerResult.FIRE_AND_PURGE;
} else {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 10000L);
return TriggerResult.FIRE;
}
} [ ... ]
So what I'm doing is setting the ctx.registerProcessingTimeTimer(window.maxTimestamp())however I only set this once at the first event. But when testing it does work as I want and fires every ten seconds and the fires and purges only after no events have been received for 2 minutes (as specified in the SessionWindow). Is the processingTimeTimer being updated every time the window end time is increased (I noticed this happens in the background by Flink every time a new event arrives)?

I'm happy with my solution, just trying to figure out how things work!

Cheers, 
Tim


On Thu, 29 Apr 2021 at 18:42, Till Rohrmann <[hidden email]> wrote:
If you use the Trigger API, then you don't have to do anything special for fault tolerance. When using the ProcessFunction, then you should use Flink's state primitives to store your state (e.g. ValueState). This will automatically checkpoint the state. In case of a failure Flink will always resume from the latest successfully completed checkpoint.

Cheers,
Till

On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <[hidden email]> wrote:
Thanks for the suggestions! I'll see if I can implement something that works!
A followup question, more related to state. If I implement either the custom trigger with or the process function, how will they handle crashes and such. So if I for instance have a checkpointing interval of 10s will the job recover from the last checkpoint with all the summaries as they were at that point. Or do I have to implement specific ValueStates in both cases?

On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <[hidden email]> wrote:
Hi Tim,

I think you could use Flink's trigger API [1] to implement a trigger which fires when it sees a certain event or after some time.


Cheers,
Till

On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[hidden email]> wrote:
Hello!

I'm trying to figure out how to implement a window that will emit events at regular intervals or when a specific event is encountered.

A bit of background. I have a stream of events from devices that will send events to our system whenever a user watches a video. These events include a unique id (sessionId) shared by all events of the same same session so I want to key my stream on this. After that I want to aggregate all the events into a session summary and this summary I want to emit every 5 minutes however I still want to keep the summary in the window (in case more events for that session arrives). However if I were to receive an end event (sent by the device once a user stops watching the video) I want to emit the summary and remove it from the window. 

Is it possible to do this with one of the existing windows together with a trigger or in some other way? Been trying to figure it out by reading the docs but haven't gotten any wiser so turning to the mailing list for help.

Best regards,
Tim


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook
Reply | Threaded
Open this post in threaded view
|

Re: How to implement a window that emits events at regular intervals and on specific events

Till Rohrmann
Hi Tim,

The way session windows work is by first creating a new window for every incoming event and then merging overlapping windows. That's why you see that the end time of a window increases with every new incoming event. I hope this explains what you are seeing. Apart from that, I think the SessionTrigger looks good to me.

Cheers,
Till

On Fri, Apr 30, 2021 at 9:27 AM Tim Josefsson <[hidden email]> wrote:
Thanks! I've managed to implement a working solution with the trigger API, but I'm not exactly sure why it works.
I'm doing the following:
DataStream<SessionSummary> summaries = env
.addSource(kafkaConsumer, "playerEvents(Kafka)")
.name("EP - Read player events from Kafka")
.uid("EP - Read player events from Kafka")
.map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("EP - Map Json to HashMap")
.uid("EP - Map Json to HashMap")
.filter((FilterFunction<HashMap>) event -> !(event.get(Field.SESSION_ID) == null))
.name("EP - Remove any events without sessionId since they shouldn't generate sessions.")
.uid("EP - Remove any events without sessionId since they shouldn't generate sessions.")
.filter((FilterFunction<HashMap>) event -> event.get(Field.ACCOUNT_ID))
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(2)))
.trigger(new SessionTrigger())
.aggregate(new SummaryAggregator())
.name("EP - Aggregate events into session summaries")
.uid("EP - Aggregate events into session summaries");

summaries.print();
With the following trigger (omitting parts of the trigger):
[ ... ]
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));

// If an end event is detected, emit the content and purge
if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}

if (firstSeen.value() == null) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 10000L);
ctx.registerProcessingTimeTimer(window.maxTimestamp());
firstSeen.update(true);
}
logger.info("Current window end is {} for session {}", window.maxTimestamp(), element.get(Field.SESSION_ID));
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
// Emit the current result every time the processing time trigger fires
if (time == window.maxTimestamp()) {
return TriggerResult.FIRE_AND_PURGE;
} else {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 10000L);
return TriggerResult.FIRE;
}
} [ ... ]
So what I'm doing is setting the ctx.registerProcessingTimeTimer(window.maxTimestamp())however I only set this once at the first event. But when testing it does work as I want and fires every ten seconds and the fires and purges only after no events have been received for 2 minutes (as specified in the SessionWindow). Is the processingTimeTimer being updated every time the window end time is increased (I noticed this happens in the background by Flink every time a new event arrives)?

I'm happy with my solution, just trying to figure out how things work!

Cheers, 
Tim


On Thu, 29 Apr 2021 at 18:42, Till Rohrmann <[hidden email]> wrote:
If you use the Trigger API, then you don't have to do anything special for fault tolerance. When using the ProcessFunction, then you should use Flink's state primitives to store your state (e.g. ValueState). This will automatically checkpoint the state. In case of a failure Flink will always resume from the latest successfully completed checkpoint.

Cheers,
Till

On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <[hidden email]> wrote:
Thanks for the suggestions! I'll see if I can implement something that works!
A followup question, more related to state. If I implement either the custom trigger with or the process function, how will they handle crashes and such. So if I for instance have a checkpointing interval of 10s will the job recover from the last checkpoint with all the summaries as they were at that point. Or do I have to implement specific ValueStates in both cases?

On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <[hidden email]> wrote:
Hi Tim,

I think you could use Flink's trigger API [1] to implement a trigger which fires when it sees a certain event or after some time.


Cheers,
Till

On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[hidden email]> wrote:
Hello!

I'm trying to figure out how to implement a window that will emit events at regular intervals or when a specific event is encountered.

A bit of background. I have a stream of events from devices that will send events to our system whenever a user watches a video. These events include a unique id (sessionId) shared by all events of the same same session so I want to key my stream on this. After that I want to aggregate all the events into a session summary and this summary I want to emit every 5 minutes however I still want to keep the summary in the window (in case more events for that session arrives). However if I were to receive an end event (sent by the device once a user stops watching the video) I want to emit the summary and remove it from the window. 

Is it possible to do this with one of the existing windows together with a trigger or in some other way? Been trying to figure it out by reading the docs but haven't gotten any wiser so turning to the mailing list for help.

Best regards,
Tim


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook


--

Tim Josefsson

Webstep GPtW
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[hidden email] 
webstep.se
Suttungs gränd 2
753 19 Uppsala

Stockholm | Uppsala | Malmö | Sundsvall | Oslo 
Bergen | Stavanger | Trondheim | Kristiansand

LinkedInFacebookFacebook