Last event of each window belongs to the next window - Wrong

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Last event of each window belongs to the next window - Wrong

Samir Abdou

I am using Flink 1.2-Snapshot. My data looks like the following:

  • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
  • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
  • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window.

Here are the sessions created:

  1. Session:

    • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
    • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
    • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. Session:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. Session:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window.

How can I trigger the window without considering the last event in that window?

Thanks for your help.

Reply | Threaded
Open this post in threaded view
|

Re: Last event of each window belongs to the next window - Wrong

Till Rohrmann
Hi Samir,

the windowing API in Flink works the following way: First an incoming element is assigned to a window. This is defined in the window clause where you create a GlobalWindow. Thus, all elements for the same sourceId will be assigned to the same window. Next, the element is given to a Trigger which decides whether the window shall be evaluated or not. But at this point the element is already part of the window. That's why the last element of your window has a different ID.

What you could try to use is the MergingWindowAssigner to create windows whose elements all have the same ID. There you assign all elements with the same ID to the same session window. The session windows are then triggered by event time, for example. That's the recommended way to create session windows with Flink. Here is some documentation for session windows [1].


Cheers,
Till

On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <[hidden email]> wrote:

I am using Flink 1.2-Snapshot. My data looks like the following:

  • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
  • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
  • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window.

Here are the sessions created:

  1. Session:

    • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
    • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
    • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. Session:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. Session:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window.

How can I trigger the window without considering the last event in that window?

Thanks for your help.


Reply | Threaded
Open this post in threaded view
|

Re: Last event of each window belongs to the next window - Wrong

Aljoscha Krettek
Hi,
why are you keying by the source ID and not by the user ID?

Cheers,
Aljoscha

On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <[hidden email]> wrote:
Hi Samir,

the windowing API in Flink works the following way: First an incoming element is assigned to a window. This is defined in the window clause where you create a GlobalWindow. Thus, all elements for the same sourceId will be assigned to the same window. Next, the element is given to a Trigger which decides whether the window shall be evaluated or not. But at this point the element is already part of the window. That's why the last element of your window has a different ID.

What you could try to use is the MergingWindowAssigner to create windows whose elements all have the same ID. There you assign all elements with the same ID to the same session window. The session windows are then triggered by event time, for example. That's the recommended way to create session windows with Flink. Here is some documentation for session windows [1].


Cheers,
Till

On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <[hidden email]> wrote:

I am using Flink 1.2-Snapshot. My data looks like the following:

  • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
  • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
  • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window.

Here are the sessions created:

  1. Session:

    • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
    • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
    • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. Session:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. Session:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window.

How can I trigger the window without considering the last event in that window?

Thanks for your help.


Reply | Threaded
Open this post in threaded view
|

Re: Last event of each window belongs to the next window - Wrong

Samir Abdou
In reply to this post by Till Rohrmann
Hi Till,

Thanks for your answer and the hint. 

However, the trigger must be based on user ID changes and not time. I tried this approach too, but I end-up having some events with the same userID that belong to the next window. I finally solved the problem by implementing a custom WindowFunction that pushes the last event of the window to the beginning of the next window.  I think a proper solution would be to implement a custom WindowAssigner and a Trigger that just emits the windows.

Cheers,
Samir

2016-11-07 15:42 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Samir,

the windowing API in Flink works the following way: First an incoming element is assigned to a window. This is defined in the window clause where you create a GlobalWindow. Thus, all elements for the same sourceId will be assigned to the same window. Next, the element is given to a Trigger which decides whether the window shall be evaluated or not. But at this point the element is already part of the window. That's why the last element of your window has a different ID.

What you could try to use is the MergingWindowAssigner to create windows whose elements all have the same ID. There you assign all elements with the same ID to the same session window. The session windows are then triggered by event time, for example. That's the recommended way to create session windows with Flink. Here is some documentation for session windows [1].


Cheers,
Till

On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <[hidden email]> wrote:

I am using Flink 1.2-Snapshot. My data looks like the following:

  • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
  • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
  • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window.

Here are the sessions created:

  1. Session:

    • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
    • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
    • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. Session:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. Session:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window.

How can I trigger the window without considering the last event in that window?

Thanks for your help.



Reply | Threaded
Open this post in threaded view
|

Re: Last event of each window belongs to the next window - Wrong

Samir Abdou
In reply to this post by Aljoscha Krettek
Hi Aljoscha,

Thanks for the question.

I key by source ID, because I want to isolate users per source. If I would key by User ID, I would need to have a logic to create sessions based on time. But I would like to create my sessions based on user ID changes in the events stream for each source.

Cheers,
Samir

2016-11-07 18:04 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
why are you keying by the source ID and not by the user ID?

Cheers,
Aljoscha

On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <[hidden email]> wrote:
Hi Samir,

the windowing API in Flink works the following way: First an incoming element is assigned to a window. This is defined in the window clause where you create a GlobalWindow. Thus, all elements for the same sourceId will be assigned to the same window. Next, the element is given to a Trigger which decides whether the window shall be evaluated or not. But at this point the element is already part of the window. That's why the last element of your window has a different ID.

What you could try to use is the MergingWindowAssigner to create windows whose elements all have the same ID. There you assign all elements with the same ID to the same session window. The session windows are then triggered by event time, for example. That's the recommended way to create session windows with Flink. Here is some documentation for session windows [1].


Cheers,
Till

On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <[hidden email]> wrote:

I am using Flink 1.2-Snapshot. My data looks like the following:

  • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
  • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
  • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window.

Here are the sessions created:

  1. Session:

    • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
    • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
    • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. Session:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. Session:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window.

How can I trigger the window without considering the last event in that window?

Thanks for your help.



Reply | Threaded
Open this post in threaded view
|

Re: Last event of each window belongs to the next window - Wrong

Aljoscha Krettek
Hi Samir,
can events with the same user ID originate from different sources? If yes, then doing things based on changes in the user idea are problematic because there are no ordering guarantees.

Cheers,
Aljoscha

On Tue, 8 Nov 2016 at 19:59 Samir Abdou <[hidden email]> wrote:
Hi Aljoscha,

Thanks for the question.

I key by source ID, because I want to isolate users per source. If I would key by User ID, I would need to have a logic to create sessions based on time. But I would like to create my sessions based on user ID changes in the events stream for each source.

Cheers,
Samir

2016-11-07 18:04 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
why are you keying by the source ID and not by the user ID?

Cheers,
Aljoscha

On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <[hidden email]> wrote:
Hi Samir,

the windowing API in Flink works the following way: First an incoming element is assigned to a window. This is defined in the window clause where you create a GlobalWindow. Thus, all elements for the same sourceId will be assigned to the same window. Next, the element is given to a Trigger which decides whether the window shall be evaluated or not. But at this point the element is already part of the window. That's why the last element of your window has a different ID.

What you could try to use is the MergingWindowAssigner to create windows whose elements all have the same ID. There you assign all elements with the same ID to the same session window. The session windows are then triggered by event time, for example. That's the recommended way to create session windows with Flink. Here is some documentation for session windows [1].


Cheers,
Till

On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <[hidden email]> wrote:

I am using Flink 1.2-Snapshot. My data looks like the following:

  • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
  • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
  • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window.

Here are the sessions created:

  1. Session:

    • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
    • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
    • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. Session:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. Session:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window.

How can I trigger the window without considering the last event in that window?

Thanks for your help.



Reply | Threaded
Open this post in threaded view
|

Re: Last event of each window belongs to the next window - Wrong

Samir Abdou
Hi Aljoscha,

Yes, the same user ID can originate from different sources. You are right, it would not be possible to guarantee ordering if you consider user IDs cross the sources.  However, when you key by source ID we isolate all the user IDs within each source ID. So I believe it should be fine. At least I get the results I expect.

Best,
Samir

2016-11-09 13:43 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi Samir,
can events with the same user ID originate from different sources? If yes, then doing things based on changes in the user idea are problematic because there are no ordering guarantees.

Cheers,
Aljoscha

On Tue, 8 Nov 2016 at 19:59 Samir Abdou <[hidden email]> wrote:
Hi Aljoscha,

Thanks for the question.

I key by source ID, because I want to isolate users per source. If I would key by User ID, I would need to have a logic to create sessions based on time. But I would like to create my sessions based on user ID changes in the events stream for each source.

Cheers,
Samir

2016-11-07 18:04 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
why are you keying by the source ID and not by the user ID?

Cheers,
Aljoscha

On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <[hidden email]> wrote:
Hi Samir,

the windowing API in Flink works the following way: First an incoming element is assigned to a window. This is defined in the window clause where you create a GlobalWindow. Thus, all elements for the same sourceId will be assigned to the same window. Next, the element is given to a Trigger which decides whether the window shall be evaluated or not. But at this point the element is already part of the window. That's why the last element of your window has a different ID.

What you could try to use is the MergingWindowAssigner to create windows whose elements all have the same ID. There you assign all elements with the same ID to the same session window. The session windows are then triggered by event time, for example. That's the recommended way to create session windows with Flink. Here is some documentation for session windows [1].


Cheers,
Till

On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <[hidden email]> wrote:

I am using Flink 1.2-Snapshot. My data looks like the following:

  • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
  • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
  • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window.

Here are the sessions created:

  1. Session:

    • id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
    • id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
    • id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. Session:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. Session:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window.

How can I trigger the window without considering the last event in that window?

Thanks for your help.