[DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

Jungtaek Lim
Hi Flink users,

I've been spending time to learn and play with Flink DataStream API, not an expert level but as a beginner. :)

To play with custom window API, I just created a small example, session window based on fixed time gap, but indicate the type of event which may contain "end of session". I guess it's not unusual to see this kind of things (like manual logout and login) though I don't have concrete real use case.

This is an implementation based on Flink DataStream API:

Custom window works pretty well and I could leverage side output very easily. One thing leading the code a bit longer was new definition of TimeWindow (to deal with event type of "close session"). Even though I tried to leverage TimeWindow via inheritance, the code goes a bit verbose as I need to implement a new Serializer as well.
(Actually it required to implement a new Trigger as well, but took workaround to leverage existing EventTimeTrigger.)

Assuming this pattern is not unusual (it would be pretty OK if it's unusual), could someone point out some points to improve or simplify the code? That would be really nice if there's something I could contribute in this case.

Thanks,
Jungtaek Lim (HeartSaVioR)

ps. This is an implementation based on Spark Structured Streaming (no custom window API, so had to put everything in state function of flatMapGroupsWithState):
Reply | Threaded
Open this post in threaded view
|

Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

Dongwon Kim-2
Hi Jungtaek,

I've faced a similar problem in the past; we need to calculate an aggregate upon receiving an end message from each user.

While you're trying to solve problem by defining a custom window assigner, I took a different approach to the problem by implementing a custom trigger.

You can see my implementation in the following link (but I'm not quite sure if my implementation could solve your case):

Best,
Dongwon 

p.s. FYI, I presented the background of the problem and the general idea last year at FlinkForward 2017 Berlin. Hope this presentation helps you: https://www.youtube.com/watch?v=wPQWFy5JENw



On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim <[hidden email]> wrote:
Hi Flink users,

I've been spending time to learn and play with Flink DataStream API, not an expert level but as a beginner. :)

To play with custom window API, I just created a small example, session window based on fixed time gap, but indicate the type of event which may contain "end of session". I guess it's not unusual to see this kind of things (like manual logout and login) though I don't have concrete real use case.

This is an implementation based on Flink DataStream API:

Custom window works pretty well and I could leverage side output very easily. One thing leading the code a bit longer was new definition of TimeWindow (to deal with event type of "close session"). Even though I tried to leverage TimeWindow via inheritance, the code goes a bit verbose as I need to implement a new Serializer as well.
(Actually it required to implement a new Trigger as well, but took workaround to leverage existing EventTimeTrigger.)

Assuming this pattern is not unusual (it would be pretty OK if it's unusual), could someone point out some points to improve or simplify the code? That would be really nice if there's something I could contribute in this case.

Thanks,
Jungtaek Lim (HeartSaVioR)

ps. This is an implementation based on Spark Structured Streaming (no custom window API, so had to put everything in state function of flatMapGroupsWithState):
Reply | Threaded
Open this post in threaded view
|

Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

Jungtaek Lim
Thanks Dongwon to provide feedback and share your approach! 

I'm not sure it could be possible (not an expert), but if we could reset intermediate result (aggregated) after processing "fire event", I guess it would work as expected, as window would still expand even after "session end", but it will provide same effect in point of "outputs". Nice approach! I'll play with this approach too.

Thanks again,
Jungtaek Lim (HeartSaVioR)

On Mon, Aug 5, 2019 at 12:01 AM Dongwon Kim <[hidden email]> wrote:
Hi Jungtaek,

I've faced a similar problem in the past; we need to calculate an aggregate upon receiving an end message from each user.

While you're trying to solve problem by defining a custom window assigner, I took a different approach to the problem by implementing a custom trigger.

You can see my implementation in the following link (but I'm not quite sure if my implementation could solve your case):

Best,
Dongwon 

p.s. FYI, I presented the background of the problem and the general idea last year at FlinkForward 2017 Berlin. Hope this presentation helps you: https://www.youtube.com/watch?v=wPQWFy5JENw



On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim <[hidden email]> wrote:
Hi Flink users,

I've been spending time to learn and play with Flink DataStream API, not an expert level but as a beginner. :)

To play with custom window API, I just created a small example, session window based on fixed time gap, but indicate the type of event which may contain "end of session". I guess it's not unusual to see this kind of things (like manual logout and login) though I don't have concrete real use case.

This is an implementation based on Flink DataStream API:

Custom window works pretty well and I could leverage side output very easily. One thing leading the code a bit longer was new definition of TimeWindow (to deal with event type of "close session"). Even though I tried to leverage TimeWindow via inheritance, the code goes a bit verbose as I need to implement a new Serializer as well.
(Actually it required to implement a new Trigger as well, but took workaround to leverage existing EventTimeTrigger.)

Assuming this pattern is not unusual (it would be pretty OK if it's unusual), could someone point out some points to improve or simplify the code? That would be really nice if there's something I could contribute in this case.

Thanks,
Jungtaek Lim (HeartSaVioR)

ps. This is an implementation based on Spark Structured Streaming (no custom window API, so had to put everything in state function of flatMapGroupsWithState):


--
Reply | Threaded
Open this post in threaded view
|

Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

Fabian Hueske-2
Hi Jungtaek,

I would recommend to implement the logic in a ProcessFunction and avoid Flink's windowing API.
IMO, the windowing API is difficult to use, because there are many pieces like WindowAssigner, Window, Trigger, Evictor, WindowFunction that are orchestrated by Flink.
This makes it very hard to understand what exactly is going on and to ensure that no state is leaked.

For example, I think your solution is not 100% correct, because a MergingWindowAssigner lacks the ability to split a window.
In case of out-of-order events, you might have the situation that a LOG OUT event for 12:00:00 arrives after a game event for 12:00:01 was assigned to a window.
In that case, you'd need to split the window and the game event at 12:00:01 would need to go to the next session window.

As I said, I would use a ProcessFunction because it is a single function and provides access to state and timers which is all you need.
The logic you would need to implement would be a bit more, but it would be much easier to reason about how the data is processed.

Best,
Fabian


Am Mo., 5. Aug. 2019 um 05:18 Uhr schrieb Jungtaek Lim <[hidden email]>:
Thanks Dongwon to provide feedback and share your approach! 

I'm not sure it could be possible (not an expert), but if we could reset intermediate result (aggregated) after processing "fire event", I guess it would work as expected, as window would still expand even after "session end", but it will provide same effect in point of "outputs". Nice approach! I'll play with this approach too.

Thanks again,
Jungtaek Lim (HeartSaVioR)

On Mon, Aug 5, 2019 at 12:01 AM Dongwon Kim <[hidden email]> wrote:
Hi Jungtaek,

I've faced a similar problem in the past; we need to calculate an aggregate upon receiving an end message from each user.

While you're trying to solve problem by defining a custom window assigner, I took a different approach to the problem by implementing a custom trigger.

You can see my implementation in the following link (but I'm not quite sure if my implementation could solve your case):

Best,
Dongwon 

p.s. FYI, I presented the background of the problem and the general idea last year at FlinkForward 2017 Berlin. Hope this presentation helps you: https://www.youtube.com/watch?v=wPQWFy5JENw



On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim <[hidden email]> wrote:
Hi Flink users,

I've been spending time to learn and play with Flink DataStream API, not an expert level but as a beginner. :)

To play with custom window API, I just created a small example, session window based on fixed time gap, but indicate the type of event which may contain "end of session". I guess it's not unusual to see this kind of things (like manual logout and login) though I don't have concrete real use case.

This is an implementation based on Flink DataStream API:

Custom window works pretty well and I could leverage side output very easily. One thing leading the code a bit longer was new definition of TimeWindow (to deal with event type of "close session"). Even though I tried to leverage TimeWindow via inheritance, the code goes a bit verbose as I need to implement a new Serializer as well.
(Actually it required to implement a new Trigger as well, but took workaround to leverage existing EventTimeTrigger.)

Assuming this pattern is not unusual (it would be pretty OK if it's unusual), could someone point out some points to improve or simplify the code? That would be really nice if there's something I could contribute in this case.

Thanks,
Jungtaek Lim (HeartSaVioR)

ps. This is an implementation based on Spark Structured Streaming (no custom window API, so had to put everything in state function of flatMapGroupsWithState):


--
Reply | Threaded
Open this post in threaded view
|

Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

Jungtaek Lim
Thanks Fabian on providing great input!

Regarding your feedback on solution, yes you're right I realized I missed out-of-order events, and as you said we have to "split" existing window into two which current abstraction of custom window couldn't help here. (Flink would have no idea how aggregated events/intermediate values - state - should be assigned to new windows. Unlike merging window, it should be dealt manually.) Trigger approach might be still viable but it should deal with current watermark as well.

Honestly the purpose of experiment was to determine the power of leveraging custom window API and custom triggers (more abstraction), rather than dealing with ProcessFunction (lower level, flatMapGroupsWithState in Spark's case). If we end up with dealing most of things on ProcessFunction, then less merits for additional complexity on abstraction. I guess I just couldn't find proper use cases on this.

Btw, I'd like to thank, I'm learning Flink with the new book "Stream Processing with Apache Flink". :) Thanks for your amazing efforts on publishing nice book!

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Aug 5, 2019 at 10:21 PM Fabian Hueske <[hidden email]> wrote:
Hi Jungtaek,

I would recommend to implement the logic in a ProcessFunction and avoid Flink's windowing API.
IMO, the windowing API is difficult to use, because there are many pieces like WindowAssigner, Window, Trigger, Evictor, WindowFunction that are orchestrated by Flink.
This makes it very hard to understand what exactly is going on and to ensure that no state is leaked.

For example, I think your solution is not 100% correct, because a MergingWindowAssigner lacks the ability to split a window.
In case of out-of-order events, you might have the situation that a LOG OUT event for 12:00:00 arrives after a game event for 12:00:01 was assigned to a window.
In that case, you'd need to split the window and the game event at 12:00:01 would need to go to the next session window.

As I said, I would use a ProcessFunction because it is a single function and provides access to state and timers which is all you need.
The logic you would need to implement would be a bit more, but it would be much easier to reason about how the data is processed.

Best,
Fabian


Am Mo., 5. Aug. 2019 um 05:18 Uhr schrieb Jungtaek Lim <[hidden email]>:
Thanks Dongwon to provide feedback and share your approach! 

I'm not sure it could be possible (not an expert), but if we could reset intermediate result (aggregated) after processing "fire event", I guess it would work as expected, as window would still expand even after "session end", but it will provide same effect in point of "outputs". Nice approach! I'll play with this approach too.

Thanks again,
Jungtaek Lim (HeartSaVioR)

On Mon, Aug 5, 2019 at 12:01 AM Dongwon Kim <[hidden email]> wrote:
Hi Jungtaek,

I've faced a similar problem in the past; we need to calculate an aggregate upon receiving an end message from each user.

While you're trying to solve problem by defining a custom window assigner, I took a different approach to the problem by implementing a custom trigger.

You can see my implementation in the following link (but I'm not quite sure if my implementation could solve your case):

Best,
Dongwon 

p.s. FYI, I presented the background of the problem and the general idea last year at FlinkForward 2017 Berlin. Hope this presentation helps you: https://www.youtube.com/watch?v=wPQWFy5JENw



On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim <[hidden email]> wrote:
Hi Flink users,

I've been spending time to learn and play with Flink DataStream API, not an expert level but as a beginner. :)

To play with custom window API, I just created a small example, session window based on fixed time gap, but indicate the type of event which may contain "end of session". I guess it's not unusual to see this kind of things (like manual logout and login) though I don't have concrete real use case.

This is an implementation based on Flink DataStream API:

Custom window works pretty well and I could leverage side output very easily. One thing leading the code a bit longer was new definition of TimeWindow (to deal with event type of "close session"). Even though I tried to leverage TimeWindow via inheritance, the code goes a bit verbose as I need to implement a new Serializer as well.
(Actually it required to implement a new Trigger as well, but took workaround to leverage existing EventTimeTrigger.)

Assuming this pattern is not unusual (it would be pretty OK if it's unusual), could someone point out some points to improve or simplify the code? That would be really nice if there's something I could contribute in this case.

Thanks,
Jungtaek Lim (HeartSaVioR)

ps. This is an implementation based on Spark Structured Streaming (no custom window API, so had to put everything in state function of flatMapGroupsWithState):


--


--
Reply | Threaded
Open this post in threaded view
|

Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

Fabian Hueske-2
Hi Jungtaek,

Sorry for the slow reply and thanks for the feedback on the book! :-)

As I said, I don't think Flink's windowing API is well suited to deal with the problem of manually terminated session windows due lack of support to split windows.
Given that Spark has similar support for timers, I guess the implementation would look pretty much alike in both systems.

Best, Fabian

Am Di., 6. Aug. 2019 um 17:14 Uhr schrieb Jungtaek Lim <[hidden email]>:
Thanks Fabian on providing great input!

Regarding your feedback on solution, yes you're right I realized I missed out-of-order events, and as you said we have to "split" existing window into two which current abstraction of custom window couldn't help here. (Flink would have no idea how aggregated events/intermediate values - state - should be assigned to new windows. Unlike merging window, it should be dealt manually.) Trigger approach might be still viable but it should deal with current watermark as well.

Honestly the purpose of experiment was to determine the power of leveraging custom window API and custom triggers (more abstraction), rather than dealing with ProcessFunction (lower level, flatMapGroupsWithState in Spark's case). If we end up with dealing most of things on ProcessFunction, then less merits for additional complexity on abstraction. I guess I just couldn't find proper use cases on this.

Btw, I'd like to thank, I'm learning Flink with the new book "Stream Processing with Apache Flink". :) Thanks for your amazing efforts on publishing nice book!

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Aug 5, 2019 at 10:21 PM Fabian Hueske <[hidden email]> wrote:
Hi Jungtaek,

I would recommend to implement the logic in a ProcessFunction and avoid Flink's windowing API.
IMO, the windowing API is difficult to use, because there are many pieces like WindowAssigner, Window, Trigger, Evictor, WindowFunction that are orchestrated by Flink.
This makes it very hard to understand what exactly is going on and to ensure that no state is leaked.

For example, I think your solution is not 100% correct, because a MergingWindowAssigner lacks the ability to split a window.
In case of out-of-order events, you might have the situation that a LOG OUT event for 12:00:00 arrives after a game event for 12:00:01 was assigned to a window.
In that case, you'd need to split the window and the game event at 12:00:01 would need to go to the next session window.

As I said, I would use a ProcessFunction because it is a single function and provides access to state and timers which is all you need.
The logic you would need to implement would be a bit more, but it would be much easier to reason about how the data is processed.

Best,
Fabian


Am Mo., 5. Aug. 2019 um 05:18 Uhr schrieb Jungtaek Lim <[hidden email]>:
Thanks Dongwon to provide feedback and share your approach! 

I'm not sure it could be possible (not an expert), but if we could reset intermediate result (aggregated) after processing "fire event", I guess it would work as expected, as window would still expand even after "session end", but it will provide same effect in point of "outputs". Nice approach! I'll play with this approach too.

Thanks again,
Jungtaek Lim (HeartSaVioR)

On Mon, Aug 5, 2019 at 12:01 AM Dongwon Kim <[hidden email]> wrote:
Hi Jungtaek,

I've faced a similar problem in the past; we need to calculate an aggregate upon receiving an end message from each user.

While you're trying to solve problem by defining a custom window assigner, I took a different approach to the problem by implementing a custom trigger.

You can see my implementation in the following link (but I'm not quite sure if my implementation could solve your case):

Best,
Dongwon 

p.s. FYI, I presented the background of the problem and the general idea last year at FlinkForward 2017 Berlin. Hope this presentation helps you: https://www.youtube.com/watch?v=wPQWFy5JENw



On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim <[hidden email]> wrote:
Hi Flink users,

I've been spending time to learn and play with Flink DataStream API, not an expert level but as a beginner. :)

To play with custom window API, I just created a small example, session window based on fixed time gap, but indicate the type of event which may contain "end of session". I guess it's not unusual to see this kind of things (like manual logout and login) though I don't have concrete real use case.

This is an implementation based on Flink DataStream API:

Custom window works pretty well and I could leverage side output very easily. One thing leading the code a bit longer was new definition of TimeWindow (to deal with event type of "close session"). Even though I tried to leverage TimeWindow via inheritance, the code goes a bit verbose as I need to implement a new Serializer as well.
(Actually it required to implement a new Trigger as well, but took workaround to leverage existing EventTimeTrigger.)

Assuming this pattern is not unusual (it would be pretty OK if it's unusual), could someone point out some points to improve or simplify the code? That would be really nice if there's something I could contribute in this case.

Thanks,
Jungtaek Lim (HeartSaVioR)

ps. This is an implementation based on Spark Structured Streaming (no custom window API, so had to put everything in state function of flatMapGroupsWithState):


--


--