Hi,
I am attempting to make use of the new window APIs in streaming to implement a session based window and am not sure if the currently provided functionality handles my use case. Specifically what I want to do is something conceptually similar to a ³Sessions.withGapDuration(Š)² window in Google DataFlow. Assuming the events are keyed by session id. I would like to use the event time and the watermarking functionality to trigger a window after the ³end of a session² (no events for a given session received within x amount of time). With watermarking this would mean trigger when a watermark is seen that is > (the time of the last event + session timeout). Also I want to perform an early triggering of the window after a given number of events have been received. Is it currently possible to do this with the current combination of window assigners and triggers? I am happy to write custom triggers etc, but wanted to make sure it wasn¹t already available before going down that road. Thanks, Paul Hamilton Hybris Software |
Hi Paul,
it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac You can use it like this: DataStream<> input = … DataStream<> result = input .keyBy(“session-id”) .window(GlobalWindows.create()) .trigger(new SessionTrigger(timeout, maxElements)) .apply(new MyWindowFunction()) The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release. This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however. Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-) Cheers, Aljoscha > On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote: > > Hi, > > I am attempting to make use of the new window APIs in streaming to > implement a session based window and am not sure if the currently provided > functionality handles my use case. Specifically what I want to do is > something conceptually similar to a ³Sessions.withGapDuration(Š)² window > in Google DataFlow. > > Assuming the events are keyed by session id. I would like to use the > event time and the watermarking functionality to trigger a window after > the ³end of a session² (no events for a given session received within x > amount of time). With watermarking this would mean trigger when a > watermark is seen that is > (the time of the last event + session > timeout). Also I want to perform an early triggering of the window after a > given number of events have been received. > > Is it currently possible to do this with the current combination of window > assigners and triggers? I am happy to write custom triggers etc, but > wanted to make sure it wasn¹t already available before going down that > road. > > Thanks, > > Paul Hamilton > Hybris Software > > |
Hi Aljoscha,
Thanks a lot for your Trigger implementation, definitely helped provide some direction. It appears to be working well for our use case. One thing I have noticed now that I have pulled the state API changes in is that key based state within a window function does not appear to be working. Perhaps I am not using it correctly now that the API has changed. Previously we had done something like this within the RichWindowFunction: @Override public void open(final Configuration parameters) throws Exception { state = getRuntimeContext().getOperatorState("state", new StatePojo(), true); } Based on the API changes I switched it to: @Override public void open(final Configuration parameters) throws Exception { state = getRuntimeContext().getKeyValueState("state", StatePojo.class, new StatePojo()); } But the state doesn’t seem to be partitioned based on the key. I haven’t had much time to play around with it, so its certainly possible that I messed something up while refactoring to the API change. I will look at it further when I get a chance, but if you have any thoughts they are much appreciated. Thanks, Paul Hamilton On 10/17/15, 6:39 AM, "Aljoscha Krettek" <[hidden email]> wrote: >Hi Paul, >it’s good to see people interested in this. I sketched a Trigger that >should fit your requirements: >https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac > >You can use it like this: > >DataStream<> input = … >DataStream<> result = input > .keyBy(“session-id”) > .window(GlobalWindows.create()) > .trigger(new SessionTrigger(timeout, maxElements)) > .apply(new MyWindowFunction()) > >The Trigger uses the new state API that I’m currently introducing in a >new Pull Request. It should be merged very soon, before the 0.10 release. > >This implementation has one caveat, though. It cannot deal with elements >that belong to different sessions that arrive intermingled with other >sessions. The reason is that Flink does not yet support merging the >windows that the WindowAssigner assigns as, for example, the Cloud >Dataflow API supports. This means that elements cannot be assigned to >session windows, instead the workaround with the GlobalWindow has to be >used. I want to tackle this for the release after 0.10, however. > >Please let us know if you need more information. I’m always happy to help >in these interesting cases at the bleeding edge of what is possible. :-) > >Cheers, >Aljoscha > >> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> >>wrote: >> >> Hi, >> >> I am attempting to make use of the new window APIs in streaming to >> implement a session based window and am not sure if the currently >>provided >> functionality handles my use case. Specifically what I want to do is >> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >> in Google DataFlow. >> >> Assuming the events are keyed by session id. I would like to use the >> event time and the watermarking functionality to trigger a window after >> the ³end of a session² (no events for a given session received within x >> amount of time). With watermarking this would mean trigger when a >> watermark is seen that is > (the time of the last event + session >> timeout). Also I want to perform an early triggering of the window >>after a >> given number of events have been received. >> >> Is it currently possible to do this with the current combination of >>window >> assigners and triggers? I am happy to write custom triggers etc, but >> wanted to make sure it wasn¹t already available before going down that >> road. >> >> Thanks, >> >> Paul Hamilton >> Hybris Software >> >> > |
Hi Paul,
good to hear that the windowing works for you. With the key based state I’m afraid you found a bug. The problem is that the state backend is not properly set to the right key when the window is evaluated. I will look into fixing this ASAP before the 0.10 release. Cheers, Aljoscha > On 21 Oct 2015, at 19:32, Hamilton, Paul <[hidden email]> wrote: > > Hi Aljoscha, > > Thanks a lot for your Trigger implementation, definitely helped provide > some direction. It appears to be working well for our use case. One > thing I have noticed now that I have pulled the state API changes in is > that key based state within a window function does not appear to be > working. Perhaps I am not using it correctly now that the API has > changed. Previously we had done something like this within the > RichWindowFunction: > > @Override > public void open(final Configuration parameters) throws Exception { > state = getRuntimeContext().getOperatorState("state", new StatePojo(), > true); > } > > Based on the API changes I switched it to: > > @Override > public void open(final Configuration parameters) throws Exception { > state = getRuntimeContext().getKeyValueState("state", StatePojo.class, > new StatePojo()); > } > > > But the state doesn’t seem to be partitioned based on the key. I haven’t > had much time to play around with it, so its certainly possible that I > messed something up while refactoring to the API change. I will look at > it further when I get a chance, but if you have any thoughts they are much > appreciated. > > > Thanks, > Paul Hamilton > > > On 10/17/15, 6:39 AM, "Aljoscha Krettek" <[hidden email]> wrote: > >> Hi Paul, >> it’s good to see people interested in this. I sketched a Trigger that >> should fit your requirements: >> https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac >> >> You can use it like this: >> >> DataStream<> input = … >> DataStream<> result = input >> .keyBy(“session-id”) >> .window(GlobalWindows.create()) >> .trigger(new SessionTrigger(timeout, maxElements)) >> .apply(new MyWindowFunction()) >> >> The Trigger uses the new state API that I’m currently introducing in a >> new Pull Request. It should be merged very soon, before the 0.10 release. >> >> This implementation has one caveat, though. It cannot deal with elements >> that belong to different sessions that arrive intermingled with other >> sessions. The reason is that Flink does not yet support merging the >> windows that the WindowAssigner assigns as, for example, the Cloud >> Dataflow API supports. This means that elements cannot be assigned to >> session windows, instead the workaround with the GlobalWindow has to be >> used. I want to tackle this for the release after 0.10, however. >> >> Please let us know if you need more information. I’m always happy to help >> in these interesting cases at the bleeding edge of what is possible. :-) >> >> Cheers, >> Aljoscha >> >>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> >>> wrote: >>> >>> Hi, >>> >>> I am attempting to make use of the new window APIs in streaming to >>> implement a session based window and am not sure if the currently >>> provided >>> functionality handles my use case. Specifically what I want to do is >>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >>> in Google DataFlow. >>> >>> Assuming the events are keyed by session id. I would like to use the >>> event time and the watermarking functionality to trigger a window after >>> the ³end of a session² (no events for a given session received within x >>> amount of time). With watermarking this would mean trigger when a >>> watermark is seen that is > (the time of the last event + session >>> timeout). Also I want to perform an early triggering of the window >>> after a >>> given number of events have been received. >>> >>> Is it currently possible to do this with the current combination of >>> window >>> assigners and triggers? I am happy to write custom triggers etc, but >>> wanted to make sure it wasn¹t already available before going down that >>> road. >>> >>> Thanks, >>> >>> Paul Hamilton >>> Hybris Software >>> >>> >> > |
Hi Paul,
the key based state should now be fixed in the current 0.10-SNAPSHOT builds if you want to continue playing around with it. Cheers, Aljoscha > On 21 Oct 2015, at 19:40, Aljoscha Krettek <[hidden email]> wrote: > > Hi Paul, > good to hear that the windowing works for you. > > With the key based state I’m afraid you found a bug. The problem is that the state backend is not properly set to the right key when the window is evaluated. I will look into fixing this ASAP before the 0.10 release. > > Cheers, > Aljoscha >> On 21 Oct 2015, at 19:32, Hamilton, Paul <[hidden email]> wrote: >> >> Hi Aljoscha, >> >> Thanks a lot for your Trigger implementation, definitely helped provide >> some direction. It appears to be working well for our use case. One >> thing I have noticed now that I have pulled the state API changes in is >> that key based state within a window function does not appear to be >> working. Perhaps I am not using it correctly now that the API has >> changed. Previously we had done something like this within the >> RichWindowFunction: >> >> @Override >> public void open(final Configuration parameters) throws Exception { >> state = getRuntimeContext().getOperatorState("state", new StatePojo(), >> true); >> } >> >> Based on the API changes I switched it to: >> >> @Override >> public void open(final Configuration parameters) throws Exception { >> state = getRuntimeContext().getKeyValueState("state", StatePojo.class, >> new StatePojo()); >> } >> >> >> But the state doesn’t seem to be partitioned based on the key. I haven’t >> had much time to play around with it, so its certainly possible that I >> messed something up while refactoring to the API change. I will look at >> it further when I get a chance, but if you have any thoughts they are much >> appreciated. >> >> >> Thanks, >> Paul Hamilton >> >> >> On 10/17/15, 6:39 AM, "Aljoscha Krettek" <[hidden email]> wrote: >> >>> Hi Paul, >>> it’s good to see people interested in this. I sketched a Trigger that >>> should fit your requirements: >>> https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac >>> >>> You can use it like this: >>> >>> DataStream<> input = … >>> DataStream<> result = input >>> .keyBy(“session-id”) >>> .window(GlobalWindows.create()) >>> .trigger(new SessionTrigger(timeout, maxElements)) >>> .apply(new MyWindowFunction()) >>> >>> The Trigger uses the new state API that I’m currently introducing in a >>> new Pull Request. It should be merged very soon, before the 0.10 release. >>> >>> This implementation has one caveat, though. It cannot deal with elements >>> that belong to different sessions that arrive intermingled with other >>> sessions. The reason is that Flink does not yet support merging the >>> windows that the WindowAssigner assigns as, for example, the Cloud >>> Dataflow API supports. This means that elements cannot be assigned to >>> session windows, instead the workaround with the GlobalWindow has to be >>> used. I want to tackle this for the release after 0.10, however. >>> >>> Please let us know if you need more information. I’m always happy to help >>> in these interesting cases at the bleeding edge of what is possible. :-) >>> >>> Cheers, >>> Aljoscha >>> >>>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> >>>> wrote: >>>> >>>> Hi, >>>> >>>> I am attempting to make use of the new window APIs in streaming to >>>> implement a session based window and am not sure if the currently >>>> provided >>>> functionality handles my use case. Specifically what I want to do is >>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >>>> in Google DataFlow. >>>> >>>> Assuming the events are keyed by session id. I would like to use the >>>> event time and the watermarking functionality to trigger a window after >>>> the ³end of a session² (no events for a given session received within x >>>> amount of time). With watermarking this would mean trigger when a >>>> watermark is seen that is > (the time of the last event + session >>>> timeout). Also I want to perform an early triggering of the window >>>> after a >>>> given number of events have been received. >>>> >>>> Is it currently possible to do this with the current combination of >>>> window >>>> assigners and triggers? I am happy to write custom triggers etc, but >>>> wanted to make sure it wasn¹t already available before going down that >>>> road. >>>> >>>> Thanks, >>>> >>>> Paul Hamilton >>>> Hybris Software >>>> >>>> >>> >> > |
In reply to this post by Aljoscha Krettek
Hi Aljoscha,
sorry to bother you again (this time with this old thread), just a short question about the caveat you mention in your answer. You wrote that events of different sessions can not intermingled. Isn't the idea of the keyBy expression below exactly not to have intermingled sessions by first grouping by sesion-ids? Cheers and thank you, Konstantin On 17.10.2015 14:39, Aljoscha Krettek wrote: > Hi Paul, > it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac > > You can use it like this: > > DataStream<> input = … > DataStream<> result = input > .keyBy(“session-id”) > .window(GlobalWindows.create()) > .trigger(new SessionTrigger(timeout, maxElements)) > .apply(new MyWindowFunction()) > > The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release. > > This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however. > > Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-) > > Cheers, > Aljoscha > >> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote: >> >> Hi, >> >> I am attempting to make use of the new window APIs in streaming to >> implement a session based window and am not sure if the currently provided >> functionality handles my use case. Specifically what I want to do is >> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >> in Google DataFlow. >> >> Assuming the events are keyed by session id. I would like to use the >> event time and the watermarking functionality to trigger a window after >> the ³end of a session² (no events for a given session received within x >> amount of time). With watermarking this would mean trigger when a >> watermark is seen that is > (the time of the last event + session >> timeout). Also I want to perform an early triggering of the window after a >> given number of events have been received. >> >> Is it currently possible to do this with the current combination of window >> assigners and triggers? I am happy to write custom triggers etc, but >> wanted to make sure it wasn¹t already available before going down that >> road. >> >> Thanks, >> >> Paul Hamilton >> Hybris Software >> >> > > -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
We, were also trying to address session windowing but took slightly different approach as to what window we place the event into.
We did not want "triggering event" to be purged as part of the window it triggered, but instead to create a new window for it and have the old window to fire and purge on event time timeout. Take a look and see if it will be useful - https://bitbucket.org/snippets/vstoyak/o9Rqp Vladimir On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf <[hidden email]> wrote: Hi Aljoscha, sorry to bother you again (this time with this old thread), just a short question about the caveat you mention in your answer. You wrote that events of different sessions can not intermingled. Isn't the idea of the keyBy expression below exactly not to have intermingled sessions by first grouping by sesion-ids? Cheers and thank you, Konstantin On 17.10.2015 14:39, Aljoscha Krettek wrote: > Hi Paul, > it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac > > You can use it like this: > > DataStream<> input = … > DataStream<> result = input > .keyBy(“session-id”) > .window(GlobalWindows.create()) > .trigger(new SessionTrigger(timeout, maxElements)) > .apply(new MyWindowFunction()) > > The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release. > > This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however. > > Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-) > > Cheers, > Aljoscha > >> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote: >> >> Hi, >> >> I am attempting to make use of the new window APIs in streaming to >> implement a session based window and am not sure if the currently provided >> functionality handles my use case. Specifically what I want to do is >> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >> in Google DataFlow. >> >> Assuming the events are keyed by session id. I would like to use the >> event time and the watermarking functionality to trigger a window after >> the ³end of a session² (no events for a given session received within x >> amount of time). With watermarking this would mean trigger when a >> watermark is seen that is > (the time of the last event + session >> timeout). Also I want to perform an early triggering of the window after a >> given number of events have been received. >> >> Is it currently possible to do this with the current combination of window >> assigners and triggers? I am happy to write custom triggers etc, but >> wanted to make sure it wasn¹t already available before going down that >> road. >> >> Thanks, >> >> Paul Hamilton >> Hybris Software >> >> > > -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Hi Konstatin,
you are right, if the stream is keyed by the session-id then it works. I was referring to the case where you have, for example, some interactions with timestamps and you want to derive the sessions from this. In that case, it can happen that events that should belong to one session (depending on their timestamp) arrive intermixed with elements that should belong to another session because of delays (and because elements never really arrive in the order of their timestamps). Does this make clear what I meant? It’s a bit tricky, so I can maybe draw a picture if it helps. Cheers, Aljoscha > On 18 Nov 2015, at 09:09, Vladimir Stoyak <[hidden email]> wrote: > > We, were also trying to address session windowing but took slightly different approach as to what window we place the event into. > > We did not want "triggering event" to be purged as part of the window it triggered, but instead to create a new window for it and have the old window to fire and purge on event time timeout. > > Take a look and see if it will be useful - > https://bitbucket.org/snippets/vstoyak/o9Rqp > > Vladimir > > > > On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf <[hidden email]> wrote: > Hi Aljoscha, > > sorry to bother you again (this time with this old thread), just a short > question about the caveat you mention in your answer. You wrote that > events of different sessions can not intermingled. Isn't the idea of the > keyBy expression below exactly not to have intermingled sessions by > first grouping by sesion-ids? > > Cheers and thank you, > > Konstantin > > On 17.10.2015 14:39, Aljoscha Krettek wrote: >> Hi Paul, >> it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac >> >> You can use it like this: >> >> DataStream<> input = … >> DataStream<> result = input >> .keyBy(“session-id”) >> .window(GlobalWindows.create()) >> .trigger(new SessionTrigger(timeout, maxElements)) >> .apply(new MyWindowFunction()) >> >> The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release. >> >> This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however. >> >> Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-) >> >> Cheers, >> Aljoscha >> >>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote: >>> >>> Hi, >>> >>> I am attempting to make use of the new window APIs in streaming to >>> implement a session based window and am not sure if the currently provided >>> functionality handles my use case. Specifically what I want to do is >>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >>> in Google DataFlow. >>> >>> Assuming the events are keyed by session id. I would like to use the >>> event time and the watermarking functionality to trigger a window after >>> the ³end of a session² (no events for a given session received within x >>> amount of time). With watermarking this would mean trigger when a >>> watermark is seen that is > (the time of the last event + session >>> timeout). Also I want to perform an early triggering of the window after a >>> given number of events have been received. >>> >>> Is it currently possible to do this with the current combination of window >>> assigners and triggers? I am happy to write custom triggers etc, but >>> wanted to make sure it wasn¹t already available before going down that >>> road. >>> >>> Thanks, >>> >>> Paul Hamilton >>> Hybris Software >>> >>> >> >> > > -- > Konstantin Knauf * [hidden email] * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Hi Aljoscha,
thanks, that's what I thought. Just wanted to verify, that keyBy + SessionWindow() works with intermingled events. Cheers, Konstantin On 18.11.2015 11:14, Aljoscha Krettek wrote: > Hi Konstatin, > you are right, if the stream is keyed by the session-id then it works. > > I was referring to the case where you have, for example, some interactions with timestamps and you want to derive the sessions from this. In that case, it can happen that events that should belong to one session (depending on their timestamp) arrive intermixed with elements that should belong to another session because of delays (and because elements never really arrive in the order of their timestamps). Does this make clear what I meant? It’s a bit tricky, so I can maybe draw a picture if it helps. > > Cheers, > Aljoscha >> On 18 Nov 2015, at 09:09, Vladimir Stoyak <[hidden email]> wrote: >> >> We, were also trying to address session windowing but took slightly different approach as to what window we place the event into. >> >> We did not want "triggering event" to be purged as part of the window it triggered, but instead to create a new window for it and have the old window to fire and purge on event time timeout. >> >> Take a look and see if it will be useful - >> https://bitbucket.org/snippets/vstoyak/o9Rqp >> >> Vladimir >> >> >> >> On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf <[hidden email]> wrote: >> Hi Aljoscha, >> >> sorry to bother you again (this time with this old thread), just a short >> question about the caveat you mention in your answer. You wrote that >> events of different sessions can not intermingled. Isn't the idea of the >> keyBy expression below exactly not to have intermingled sessions by >> first grouping by sesion-ids? >> >> Cheers and thank you, >> >> Konstantin >> >> On 17.10.2015 14:39, Aljoscha Krettek wrote: >>> Hi Paul, >>> it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac >>> >>> You can use it like this: >>> >>> DataStream<> input = … >>> DataStream<> result = input >>> .keyBy(“session-id”) >>> .window(GlobalWindows.create()) >>> .trigger(new SessionTrigger(timeout, maxElements)) >>> .apply(new MyWindowFunction()) >>> >>> The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release. >>> >>> This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however. >>> >>> Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-) >>> >>> Cheers, >>> Aljoscha >>> >>>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote: >>>> >>>> Hi, >>>> >>>> I am attempting to make use of the new window APIs in streaming to >>>> implement a session based window and am not sure if the currently provided >>>> functionality handles my use case. Specifically what I want to do is >>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >>>> in Google DataFlow. >>>> >>>> Assuming the events are keyed by session id. I would like to use the >>>> event time and the watermarking functionality to trigger a window after >>>> the ³end of a session² (no events for a given session received within x >>>> amount of time). With watermarking this would mean trigger when a >>>> watermark is seen that is > (the time of the last event + session >>>> timeout). Also I want to perform an early triggering of the window after a >>>> given number of events have been received. >>>> >>>> Is it currently possible to do this with the current combination of window >>>> assigners and triggers? I am happy to write custom triggers etc, but >>>> wanted to make sure it wasn¹t already available before going down that >>>> road. >>>> >>>> Thanks, >>>> >>>> Paul Hamilton >>>> Hybris Software >>>> >>>> >>> >>> >> >> -- >> Konstantin Knauf * [hidden email] * +49-174-3413182 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Free forum by Nabble | Edit this page |