Session Based Windows

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Session Based Windows

Hamilton, Paul
Hi,

I am attempting to make use of the new window APIs in streaming to
implement a session based window and am not sure if the currently provided
functionality handles my use case.  Specifically what I want to do is
something conceptually similar to a ³Sessions.withGapDuration(Š)² window
in Google DataFlow.

Assuming the events are keyed by session id.  I would like to use the
event time and the watermarking functionality to trigger a window after
the ³end of a session² (no events for a given session received within x
amount of time).  With watermarking this would mean trigger when a
watermark is seen that is > (the time of the last event + session
timeout). Also I want to perform an early triggering of the window after a
given number of events have been received.

Is it currently possible to do this with the current combination of window
assigners and triggers?  I am happy to write custom triggers etc, but
wanted to make sure it wasn¹t already available before going down that
road.

Thanks,

Paul Hamilton
Hybris Software
 

Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

Aljoscha Krettek
Hi Paul,
it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac

You can use it like this:

DataStream<> input = …
DataStream<> result = input
  .keyBy(“session-id”)
  .window(GlobalWindows.create())
  .trigger(new SessionTrigger(timeout, maxElements))
  .apply(new MyWindowFunction())

The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release.

This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however.

Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-)

Cheers,
Aljoscha

> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote:
>
> Hi,
>
> I am attempting to make use of the new window APIs in streaming to
> implement a session based window and am not sure if the currently provided
> functionality handles my use case.  Specifically what I want to do is
> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
> in Google DataFlow.
>
> Assuming the events are keyed by session id.  I would like to use the
> event time and the watermarking functionality to trigger a window after
> the ³end of a session² (no events for a given session received within x
> amount of time).  With watermarking this would mean trigger when a
> watermark is seen that is > (the time of the last event + session
> timeout). Also I want to perform an early triggering of the window after a
> given number of events have been received.
>
> Is it currently possible to do this with the current combination of window
> assigners and triggers?  I am happy to write custom triggers etc, but
> wanted to make sure it wasn¹t already available before going down that
> road.
>
> Thanks,
>
> Paul Hamilton
> Hybris Software
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

Hamilton, Paul
Hi Aljoscha,

Thanks a lot for your Trigger implementation, definitely helped provide
some direction.  It appears to be working well for our use case.  One
thing I have noticed now that I have pulled the state API changes in is
that key based state within a window function does not appear to be
working.  Perhaps I am not using it correctly now that the API has
changed.  Previously we had done something like this within the
RichWindowFunction:

@Override
        public void open(final Configuration parameters) throws Exception {
   state = getRuntimeContext().getOperatorState("state", new StatePojo(),
true);
        }

Based on the API changes I switched it to:

@Override
public void open(final Configuration parameters) throws Exception {
   state = getRuntimeContext().getKeyValueState("state", StatePojo.class,
new StatePojo());
}


But the state doesn’t seem to be partitioned based on the key.  I haven’t
had much time to play around with it, so its certainly possible that I
messed something up while refactoring to the API change.  I will look at
it further when I get a chance, but if you have any thoughts they are much
appreciated.


Thanks,
Paul Hamilton


On 10/17/15, 6:39 AM, "Aljoscha Krettek" <[hidden email]> wrote:

>Hi Paul,
>it’s good to see people interested in this. I sketched a Trigger that
>should fit your requirements:
>https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>
>You can use it like this:
>
>DataStream<> input = …
>DataStream<> result = input
>  .keyBy(“session-id”)
>  .window(GlobalWindows.create())
>  .trigger(new SessionTrigger(timeout, maxElements))
>  .apply(new MyWindowFunction())
>
>The Trigger uses the new state API that I’m currently introducing in a
>new Pull Request. It should be merged very soon, before the 0.10 release.
>
>This implementation has one caveat, though. It cannot deal with elements
>that belong to different sessions that arrive intermingled with other
>sessions. The reason is that Flink does not yet support merging the
>windows that the WindowAssigner assigns as, for example, the Cloud
>Dataflow API supports. This means that elements cannot be assigned to
>session windows, instead the workaround with the GlobalWindow has to be
>used. I want to tackle this for the release after 0.10, however.
>
>Please let us know if you need more information. I’m always happy to help
>in these interesting cases at the bleeding edge of what is possible. :-)
>
>Cheers,
>Aljoscha
>
>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]>
>>wrote:
>>
>> Hi,
>>
>> I am attempting to make use of the new window APIs in streaming to
>> implement a session based window and am not sure if the currently
>>provided
>> functionality handles my use case.  Specifically what I want to do is
>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>> in Google DataFlow.
>>
>> Assuming the events are keyed by session id.  I would like to use the
>> event time and the watermarking functionality to trigger a window after
>> the ³end of a session² (no events for a given session received within x
>> amount of time).  With watermarking this would mean trigger when a
>> watermark is seen that is > (the time of the last event + session
>> timeout). Also I want to perform an early triggering of the window
>>after a
>> given number of events have been received.
>>
>> Is it currently possible to do this with the current combination of
>>window
>> assigners and triggers?  I am happy to write custom triggers etc, but
>> wanted to make sure it wasn¹t already available before going down that
>> road.
>>
>> Thanks,
>>
>> Paul Hamilton
>> Hybris Software
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

Aljoscha Krettek
Hi Paul,
good to hear that the windowing works for you.

With the key based state I’m afraid you found a bug. The problem is that the state backend is not properly set to the right key when the window is evaluated. I will look into fixing this ASAP before the 0.10 release.

Cheers,
Aljoscha

> On 21 Oct 2015, at 19:32, Hamilton, Paul <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Thanks a lot for your Trigger implementation, definitely helped provide
> some direction.  It appears to be working well for our use case.  One
> thing I have noticed now that I have pulled the state API changes in is
> that key based state within a window function does not appear to be
> working.  Perhaps I am not using it correctly now that the API has
> changed.  Previously we had done something like this within the
> RichWindowFunction:
>
> @Override
> public void open(final Configuration parameters) throws Exception {
>   state = getRuntimeContext().getOperatorState("state", new StatePojo(),
> true);
> }
>
> Based on the API changes I switched it to:
>
> @Override
> public void open(final Configuration parameters) throws Exception {
>   state = getRuntimeContext().getKeyValueState("state", StatePojo.class,
> new StatePojo());
> }
>
>
> But the state doesn’t seem to be partitioned based on the key.  I haven’t
> had much time to play around with it, so its certainly possible that I
> messed something up while refactoring to the API change.  I will look at
> it further when I get a chance, but if you have any thoughts they are much
> appreciated.
>
>
> Thanks,
> Paul Hamilton
>
>
> On 10/17/15, 6:39 AM, "Aljoscha Krettek" <[hidden email]> wrote:
>
>> Hi Paul,
>> it’s good to see people interested in this. I sketched a Trigger that
>> should fit your requirements:
>> https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>>
>> You can use it like this:
>>
>> DataStream<> input = …
>> DataStream<> result = input
>> .keyBy(“session-id”)
>> .window(GlobalWindows.create())
>> .trigger(new SessionTrigger(timeout, maxElements))
>> .apply(new MyWindowFunction())
>>
>> The Trigger uses the new state API that I’m currently introducing in a
>> new Pull Request. It should be merged very soon, before the 0.10 release.
>>
>> This implementation has one caveat, though. It cannot deal with elements
>> that belong to different sessions that arrive intermingled with other
>> sessions. The reason is that Flink does not yet support merging the
>> windows that the WindowAssigner assigns as, for example, the Cloud
>> Dataflow API supports. This means that elements cannot be assigned to
>> session windows, instead the workaround with the GlobalWindow has to be
>> used. I want to tackle this for the release after 0.10, however.
>>
>> Please let us know if you need more information. I’m always happy to help
>> in these interesting cases at the bleeding edge of what is possible. :-)
>>
>> Cheers,
>> Aljoscha
>>
>>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I am attempting to make use of the new window APIs in streaming to
>>> implement a session based window and am not sure if the currently
>>> provided
>>> functionality handles my use case.  Specifically what I want to do is
>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>>> in Google DataFlow.
>>>
>>> Assuming the events are keyed by session id.  I would like to use the
>>> event time and the watermarking functionality to trigger a window after
>>> the ³end of a session² (no events for a given session received within x
>>> amount of time).  With watermarking this would mean trigger when a
>>> watermark is seen that is > (the time of the last event + session
>>> timeout). Also I want to perform an early triggering of the window
>>> after a
>>> given number of events have been received.
>>>
>>> Is it currently possible to do this with the current combination of
>>> window
>>> assigners and triggers?  I am happy to write custom triggers etc, but
>>> wanted to make sure it wasn¹t already available before going down that
>>> road.
>>>
>>> Thanks,
>>>
>>> Paul Hamilton
>>> Hybris Software
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

Aljoscha Krettek
Hi Paul,
the key based state should now be fixed in the current 0.10-SNAPSHOT builds if you want to continue playing around with it.

Cheers,
Aljoscha

> On 21 Oct 2015, at 19:40, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Paul,
> good to hear that the windowing works for you.
>
> With the key based state I’m afraid you found a bug. The problem is that the state backend is not properly set to the right key when the window is evaluated. I will look into fixing this ASAP before the 0.10 release.
>
> Cheers,
> Aljoscha
>> On 21 Oct 2015, at 19:32, Hamilton, Paul <[hidden email]> wrote:
>>
>> Hi Aljoscha,
>>
>> Thanks a lot for your Trigger implementation, definitely helped provide
>> some direction.  It appears to be working well for our use case.  One
>> thing I have noticed now that I have pulled the state API changes in is
>> that key based state within a window function does not appear to be
>> working.  Perhaps I am not using it correctly now that the API has
>> changed.  Previously we had done something like this within the
>> RichWindowFunction:
>>
>> @Override
>> public void open(final Configuration parameters) throws Exception {
>>  state = getRuntimeContext().getOperatorState("state", new StatePojo(),
>> true);
>> }
>>
>> Based on the API changes I switched it to:
>>
>> @Override
>> public void open(final Configuration parameters) throws Exception {
>>  state = getRuntimeContext().getKeyValueState("state", StatePojo.class,
>> new StatePojo());
>> }
>>
>>
>> But the state doesn’t seem to be partitioned based on the key.  I haven’t
>> had much time to play around with it, so its certainly possible that I
>> messed something up while refactoring to the API change.  I will look at
>> it further when I get a chance, but if you have any thoughts they are much
>> appreciated.
>>
>>
>> Thanks,
>> Paul Hamilton
>>
>>
>> On 10/17/15, 6:39 AM, "Aljoscha Krettek" <[hidden email]> wrote:
>>
>>> Hi Paul,
>>> it’s good to see people interested in this. I sketched a Trigger that
>>> should fit your requirements:
>>> https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>>>
>>> You can use it like this:
>>>
>>> DataStream<> input = …
>>> DataStream<> result = input
>>> .keyBy(“session-id”)
>>> .window(GlobalWindows.create())
>>> .trigger(new SessionTrigger(timeout, maxElements))
>>> .apply(new MyWindowFunction())
>>>
>>> The Trigger uses the new state API that I’m currently introducing in a
>>> new Pull Request. It should be merged very soon, before the 0.10 release.
>>>
>>> This implementation has one caveat, though. It cannot deal with elements
>>> that belong to different sessions that arrive intermingled with other
>>> sessions. The reason is that Flink does not yet support merging the
>>> windows that the WindowAssigner assigns as, for example, the Cloud
>>> Dataflow API supports. This means that elements cannot be assigned to
>>> session windows, instead the workaround with the GlobalWindow has to be
>>> used. I want to tackle this for the release after 0.10, however.
>>>
>>> Please let us know if you need more information. I’m always happy to help
>>> in these interesting cases at the bleeding edge of what is possible. :-)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am attempting to make use of the new window APIs in streaming to
>>>> implement a session based window and am not sure if the currently
>>>> provided
>>>> functionality handles my use case.  Specifically what I want to do is
>>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>>>> in Google DataFlow.
>>>>
>>>> Assuming the events are keyed by session id.  I would like to use the
>>>> event time and the watermarking functionality to trigger a window after
>>>> the ³end of a session² (no events for a given session received within x
>>>> amount of time).  With watermarking this would mean trigger when a
>>>> watermark is seen that is > (the time of the last event + session
>>>> timeout). Also I want to perform an early triggering of the window
>>>> after a
>>>> given number of events have been received.
>>>>
>>>> Is it currently possible to do this with the current combination of
>>>> window
>>>> assigners and triggers?  I am happy to write custom triggers etc, but
>>>> wanted to make sure it wasn¹t already available before going down that
>>>> road.
>>>>
>>>> Thanks,
>>>>
>>>> Paul Hamilton
>>>> Hybris Software
>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

snntr
In reply to this post by Aljoscha Krettek
Hi Aljoscha,

sorry to bother you again (this time with this old thread), just a short
question about the caveat you mention in your answer. You wrote that
events of different sessions can not intermingled. Isn't the idea of the
keyBy expression below exactly not to have intermingled sessions by
first grouping by sesion-ids?

Cheers and thank you,

Konstantin

On 17.10.2015 14:39, Aljoscha Krettek wrote:

> Hi Paul,
> it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>
> You can use it like this:
>
> DataStream<> input = …
> DataStream<> result = input
>   .keyBy(“session-id”)
>   .window(GlobalWindows.create())
>   .trigger(new SessionTrigger(timeout, maxElements))
>   .apply(new MyWindowFunction())
>
> The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release.
>
> This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however.
>
> Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-)
>
> Cheers,
> Aljoscha
>
>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote:
>>
>> Hi,
>>
>> I am attempting to make use of the new window APIs in streaming to
>> implement a session based window and am not sure if the currently provided
>> functionality handles my use case.  Specifically what I want to do is
>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>> in Google DataFlow.
>>
>> Assuming the events are keyed by session id.  I would like to use the
>> event time and the watermarking functionality to trigger a window after
>> the ³end of a session² (no events for a given session received within x
>> amount of time).  With watermarking this would mean trigger when a
>> watermark is seen that is > (the time of the last event + session
>> timeout). Also I want to perform an early triggering of the window after a
>> given number of events have been received.
>>
>> Is it currently possible to do this with the current combination of window
>> assigners and triggers?  I am happy to write custom triggers etc, but
>> wanted to make sure it wasn¹t already available before going down that
>> road.
>>
>> Thanks,
>>
>> Paul Hamilton
>> Hybris Software
>>
>>
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

Vladimir Stoyak
We, were also trying to address session windowing but took slightly different approach as to what window we place the event into.

We did not want "triggering event" to be purged as part of the window it triggered, but instead to create a new window for it and have the old window to fire and purge on event time timeout.

Take a look and see if it will be useful -
https://bitbucket.org/snippets/vstoyak/o9Rqp

Vladimir



On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

sorry to bother you again (this time with this old thread), just a short
question about the caveat you mention in your answer. You wrote that
events of different sessions can not intermingled. Isn't the idea of the
keyBy expression below exactly not to have intermingled sessions by
first grouping by sesion-ids?

Cheers and thank you,

Konstantin

On 17.10.2015 14:39, Aljoscha Krettek wrote:

> Hi Paul,
> it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>
> You can use it like this:
>
> DataStream<> input = …
> DataStream<> result = input
>   .keyBy(“session-id”)
>   .window(GlobalWindows.create())
>   .trigger(new SessionTrigger(timeout, maxElements))
>   .apply(new MyWindowFunction())
>
> The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release.
>
> This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however.
>
> Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-)
>
> Cheers,
> Aljoscha
>
>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote:
>>
>> Hi,
>>
>> I am attempting to make use of the new window APIs in streaming to
>> implement a session based window and am not sure if the currently provided
>> functionality handles my use case.  Specifically what I want to do is
>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>> in Google DataFlow.
>>
>> Assuming the events are keyed by session id.  I would like to use the
>> event time and the watermarking functionality to trigger a window after
>> the ³end of a session² (no events for a given session received within x
>> amount of time).  With watermarking this would mean trigger when a
>> watermark is seen that is > (the time of the last event + session
>> timeout). Also I want to perform an early triggering of the window after a
>> given number of events have been received.
>>
>> Is it currently possible to do this with the current combination of window
>> assigners and triggers?  I am happy to write custom triggers etc, but
>> wanted to make sure it wasn¹t already available before going down that
>> road.
>>
>> Thanks,
>>
>> Paul Hamilton
>> Hybris Software
>>
>>
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

Aljoscha Krettek
Hi Konstatin,
you are right, if the stream is keyed by the session-id then it works.

I was referring to the case where you have, for example, some interactions with timestamps and you want to derive the sessions from this. In that case, it can happen that events that should belong to one session (depending on their timestamp) arrive intermixed with elements that should belong to another session because of delays (and because elements never really arrive in the order of their timestamps). Does this make clear what I meant? It’s a bit tricky, so I can maybe draw a picture if it helps.

Cheers,
Aljoscha

> On 18 Nov 2015, at 09:09, Vladimir Stoyak <[hidden email]> wrote:
>
> We, were also trying to address session windowing but took slightly different approach as to what window we place the event into.
>
> We did not want "triggering event" to be purged as part of the window it triggered, but instead to create a new window for it and have the old window to fire and purge on event time timeout.
>
> Take a look and see if it will be useful -
> https://bitbucket.org/snippets/vstoyak/o9Rqp
>
> Vladimir
>
>
>
> On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf <[hidden email]> wrote:
> Hi Aljoscha,
>
> sorry to bother you again (this time with this old thread), just a short
> question about the caveat you mention in your answer. You wrote that
> events of different sessions can not intermingled. Isn't the idea of the
> keyBy expression below exactly not to have intermingled sessions by
> first grouping by sesion-ids?
>
> Cheers and thank you,
>
> Konstantin
>
> On 17.10.2015 14:39, Aljoscha Krettek wrote:
>> Hi Paul,
>> it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>>
>> You can use it like this:
>>
>> DataStream<> input = …
>> DataStream<> result = input
>>  .keyBy(“session-id”)
>>  .window(GlobalWindows.create())
>>  .trigger(new SessionTrigger(timeout, maxElements))
>>  .apply(new MyWindowFunction())
>>
>> The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release.
>>
>> This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however.
>>
>> Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-)
>>
>> Cheers,
>> Aljoscha
>>
>>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I am attempting to make use of the new window APIs in streaming to
>>> implement a session based window and am not sure if the currently provided
>>> functionality handles my use case.  Specifically what I want to do is
>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>>> in Google DataFlow.
>>>
>>> Assuming the events are keyed by session id.  I would like to use the
>>> event time and the watermarking functionality to trigger a window after
>>> the ³end of a session² (no events for a given session received within x
>>> amount of time).  With watermarking this would mean trigger when a
>>> watermark is seen that is > (the time of the last event + session
>>> timeout). Also I want to perform an early triggering of the window after a
>>> given number of events have been received.
>>>
>>> Is it currently possible to do this with the current combination of window
>>> assigners and triggers?  I am happy to write custom triggers etc, but
>>> wanted to make sure it wasn¹t already available before going down that
>>> road.
>>>
>>> Thanks,
>>>
>>> Paul Hamilton
>>> Hybris Software
>>>
>>>
>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Session Based Windows

snntr
Hi Aljoscha,

thanks, that's what I thought. Just wanted to verify, that keyBy +
SessionWindow() works with intermingled events.

Cheers,

Konstantin

On 18.11.2015 11:14, Aljoscha Krettek wrote:

> Hi Konstatin,
> you are right, if the stream is keyed by the session-id then it works.
>
> I was referring to the case where you have, for example, some interactions with timestamps and you want to derive the sessions from this. In that case, it can happen that events that should belong to one session (depending on their timestamp) arrive intermixed with elements that should belong to another session because of delays (and because elements never really arrive in the order of their timestamps). Does this make clear what I meant? It’s a bit tricky, so I can maybe draw a picture if it helps.
>
> Cheers,
> Aljoscha
>> On 18 Nov 2015, at 09:09, Vladimir Stoyak <[hidden email]> wrote:
>>
>> We, were also trying to address session windowing but took slightly different approach as to what window we place the event into.
>>
>> We did not want "triggering event" to be purged as part of the window it triggered, but instead to create a new window for it and have the old window to fire and purge on event time timeout.
>>
>> Take a look and see if it will be useful -
>> https://bitbucket.org/snippets/vstoyak/o9Rqp
>>
>> Vladimir
>>
>>
>>
>> On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf <[hidden email]> wrote:
>> Hi Aljoscha,
>>
>> sorry to bother you again (this time with this old thread), just a short
>> question about the caveat you mention in your answer. You wrote that
>> events of different sessions can not intermingled. Isn't the idea of the
>> keyBy expression below exactly not to have intermingled sessions by
>> first grouping by sesion-ids?
>>
>> Cheers and thank you,
>>
>> Konstantin
>>
>> On 17.10.2015 14:39, Aljoscha Krettek wrote:
>>> Hi Paul,
>>> it’s good to see people interested in this. I sketched a Trigger that should fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>>>
>>> You can use it like this:
>>>
>>> DataStream<> input = …
>>> DataStream<> result = input
>>>  .keyBy(“session-id”)
>>>  .window(GlobalWindows.create())
>>>  .trigger(new SessionTrigger(timeout, maxElements))
>>>  .apply(new MyWindowFunction())
>>>
>>> The Trigger uses the new state API that I’m currently introducing in a new Pull Request. It should be merged very soon, before the 0.10 release.
>>>
>>> This implementation has one caveat, though. It cannot deal with elements that belong to different sessions that arrive intermingled with other sessions. The reason is that Flink does not yet support merging the windows that the WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This means that elements cannot be assigned to session windows, instead the workaround with the GlobalWindow has to be used. I want to tackle this for the release after 0.10, however.
>>>
>>> Please let us know if you need more information. I’m always happy to help in these interesting cases at the bleeding edge of what is possible. :-)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>> On 16 Oct 2015, at 19:36, Hamilton, Paul <[hidden email]> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am attempting to make use of the new window APIs in streaming to
>>>> implement a session based window and am not sure if the currently provided
>>>> functionality handles my use case.  Specifically what I want to do is
>>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>>>> in Google DataFlow.
>>>>
>>>> Assuming the events are keyed by session id.  I would like to use the
>>>> event time and the watermarking functionality to trigger a window after
>>>> the ³end of a session² (no events for a given session received within x
>>>> amount of time).  With watermarking this would mean trigger when a
>>>> watermark is seen that is > (the time of the last event + session
>>>> timeout). Also I want to perform an early triggering of the window after a
>>>> given number of events have been received.
>>>>
>>>> Is it currently possible to do this with the current combination of window
>>>> assigners and triggers?  I am happy to write custom triggers etc, but
>>>> wanted to make sure it wasn¹t already available before going down that
>>>> road.
>>>>
>>>> Thanks,
>>>>
>>>> Paul Hamilton
>>>> Hybris Software
>>>>
>>>>
>>>
>>>
>>
>> --
>> Konstantin Knauf * [hidden email] * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082