EventTimeSessionWindow firing too soon

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

EventTimeSessionWindow firing too soon

orips
I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a gap of 30 minutes.

But as soon as I start the job, events are written to the sink (I can see them in S3) even though 30 minutes have not passed.

This is my job:

val stream = senv
      .addSource(new FlinkKafkaConsumer("…", compressedEventDeserializer, properties))
      .filter(_.sessionId.nonEmpty)
      .flatMap(_ match { case (_, events) => events })
      .assignTimestampsAndWatermarks(new TimestampExtractor[Event](Time.minutes(10)) {
        override def extractTimestamp(element: Event): Long = event.sequence / 1000 // microseconds
      })
      .keyBy(_.sessionId)
      .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
      .process(myProcessWindowFunction)

AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)

Any idea why it's happening?
Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

1048262223
Hi

I think it maybe you use the event time, and the timestamp between your event data is bigger than 30minutes, maybe you can check the source data timestamp.

Best,
Yichao Yang



------------------ Original ------------------
From: Ori Popowski <[hidden email]>
Date: Mon,Jun 15,2020 10:50 PM
To: user <[hidden email]>
Subject: Re: EventTimeSessionWindow firing too soon

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

rmetzger0
If you are using event time in Flink, it is disconnected from the real world wall clock time.
You can process historical data in a streaming program as if it was real-time data (potentially reading through (event time) years of data in a few (wall clock) minutes)

On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
Hi

I think it maybe you use the event time, and the timestamp between your event data is bigger than 30minutes, maybe you can check the source data timestamp.

Best,
Yichao Yang


发自我的iPhone


------------------ Original ------------------
From: Ori Popowski <[hidden email]>
Date: Mon,Jun 15,2020 10:50 PM
To: user <[hidden email]>
Subject: Re: EventTimeSessionWindow firing too soon

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

orips
So why is it happening? I have no clue at the moment.
My event-time timestamps also do not have big gaps between them that would explain the window triggering.


On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]> wrote:
If you are using event time in Flink, it is disconnected from the real world wall clock time.
You can process historical data in a streaming program as if it was real-time data (potentially reading through (event time) years of data in a few (wall clock) minutes)

On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
Hi

I think it maybe you use the event time, and the timestamp between your event data is bigger than 30minutes, maybe you can check the source data timestamp.

Best,
Yichao Yang


发自我的iPhone


------------------ Original ------------------
From: Ori Popowski <[hidden email]>
Date: Mon,Jun 15,2020 10:50 PM
To: user <[hidden email]>
Subject: Re: EventTimeSessionWindow firing too soon

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

Rafi Aroch
Hi Ori,

I guess you consume from Kafka from the earliest offset, so you consume historical data and Flink is catching-up.

Regarding: My event-time timestamps also do not have big gaps

Just to verify, if you do keyBy sessionId, do you check the gaps of events from the same session?

Rafi


On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
So why is it happening? I have no clue at the moment.
My event-time timestamps also do not have big gaps between them that would explain the window triggering.


On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]> wrote:
If you are using event time in Flink, it is disconnected from the real world wall clock time.
You can process historical data in a streaming program as if it was real-time data (potentially reading through (event time) years of data in a few (wall clock) minutes)

On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
Hi

I think it maybe you use the event time, and the timestamp between your event data is bigger than 30minutes, maybe you can check the source data timestamp.

Best,
Yichao Yang


发自我的iPhone


------------------ Original ------------------
From: Ori Popowski <[hidden email]>
Date: Mon,Jun 15,2020 10:50 PM
To: user <[hidden email]>
Subject: Re: EventTimeSessionWindow firing too soon

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

orips
Hi, thanks for answering.

> I guess you consume from Kafka from the earliest offset, so you consume historical data and Flink is catching-up.
Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew between partitions cannot explain it.
I think the only way it can happen is when when suddenly there's one event with very late timestamp

> Just to verify, if you do keyBy sessionId, do you check the gaps of events from the same session?
Good point. sessionId is unique in this case, and even if it's not - every single session suffers from this problem of early triggering so it's very unlikely that all millions sessions within that hour had duplicates.

I'm suspecting that the fact I have two ProcessWindowFunctions one after the other somehow causes this.
I deployed a version with one window function which just prints the timestamps to S3 (to find out if I have event-time jumps) and suddenly it doesn't trigger early (I'm running for 10 minutes and not a single event has arrived to the sink)

On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch <[hidden email]> wrote:
Hi Ori,

I guess you consume from Kafka from the earliest offset, so you consume historical data and Flink is catching-up.

Regarding: My event-time timestamps also do not have big gaps

Just to verify, if you do keyBy sessionId, do you check the gaps of events from the same session?

Rafi


On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
So why is it happening? I have no clue at the moment.
My event-time timestamps also do not have big gaps between them that would explain the window triggering.


On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]> wrote:
If you are using event time in Flink, it is disconnected from the real world wall clock time.
You can process historical data in a streaming program as if it was real-time data (potentially reading through (event time) years of data in a few (wall clock) minutes)

On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
Hi

I think it maybe you use the event time, and the timestamp between your event data is bigger than 30minutes, maybe you can check the source data timestamp.

Best,
Yichao Yang


发自我的iPhone


------------------ Original ------------------
From: Ori Popowski <[hidden email]>
Date: Mon,Jun 15,2020 10:50 PM
To: user <[hidden email]>
Subject: Re: EventTimeSessionWindow firing too soon

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

Aljoscha Krettek
In reply to this post by orips
Hi,

what is the timescale of your data in Kafka. If you have data in there
that spans more than ~30 minutes I would expect your windows to fire
very soon after the job is started. Event time does not depend on a wall
clock but instead advances with the time in the stream. As Flink
advances through the data in Kafka so does event-time advance in step.

Does that explain your situation?

Best,
Aljoscha

On 15.06.20 16:49, Ori Popowski wrote:

> I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
> gap of 30 minutes.
>
> But as soon as I start the job, events are written to the sink (I can see
> them in S3) even though 30 minutes have not passed.
>
> This is my job:
>
> val stream = senv
>        .addSource(new FlinkKafkaConsumer("…", compressedEventDeserializer,
> properties))
>        .filter(_.sessionId.nonEmpty)
>        .flatMap(_ match { case (_, events) => events })
>        .assignTimestampsAndWatermarks(new
> TimestampExtractor[Event](Time.minutes(10)) {
>          override def extractTimestamp(element: Event): Long =
> event.sequence / 1000 // microseconds
>        })
>        .keyBy(_.sessionId)
>        .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
>        .process(myProcessWindowFunction)
>
> AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)
>
> Any idea why it's happening?
>

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

orips
In reply to this post by orips
Okay, so I created a simple stream (similar to the original stream), where I just write the timestamps of each evaluated window to S3.
The session gap is 30 minutes, and this is one of the sessions: (first-event, last-event, num-events)

11:23-11:23 11 events
11:25-11:26 51 events
11:28-11:29 74 events
11:31-11:31 13 events

Again, this is one session. How can we explain this? Why does Flink create 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate some help.

On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski <[hidden email]> wrote:
Hi, thanks for answering.

> I guess you consume from Kafka from the earliest offset, so you consume historical data and Flink is catching-up.
Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew between partitions cannot explain it.
I think the only way it can happen is when when suddenly there's one event with very late timestamp

> Just to verify, if you do keyBy sessionId, do you check the gaps of events from the same session?
Good point. sessionId is unique in this case, and even if it's not - every single session suffers from this problem of early triggering so it's very unlikely that all millions sessions within that hour had duplicates.

I'm suspecting that the fact I have two ProcessWindowFunctions one after the other somehow causes this.
I deployed a version with one window function which just prints the timestamps to S3 (to find out if I have event-time jumps) and suddenly it doesn't trigger early (I'm running for 10 minutes and not a single event has arrived to the sink)

On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch <[hidden email]> wrote:
Hi Ori,

I guess you consume from Kafka from the earliest offset, so you consume historical data and Flink is catching-up.

Regarding: My event-time timestamps also do not have big gaps

Just to verify, if you do keyBy sessionId, do you check the gaps of events from the same session?

Rafi


On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
So why is it happening? I have no clue at the moment.
My event-time timestamps also do not have big gaps between them that would explain the window triggering.


On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]> wrote:
If you are using event time in Flink, it is disconnected from the real world wall clock time.
You can process historical data in a streaming program as if it was real-time data (potentially reading through (event time) years of data in a few (wall clock) minutes)

On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
Hi

I think it maybe you use the event time, and the timestamp between your event data is bigger than 30minutes, maybe you can check the source data timestamp.

Best,
Yichao Yang


发自我的iPhone


------------------ Original ------------------
From: Ori Popowski <[hidden email]>
Date: Mon,Jun 15,2020 10:50 PM
To: user <[hidden email]>
Subject: Re: EventTimeSessionWindow firing too soon

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

Aljoscha Krettek
In reply to this post by Aljoscha Krettek
Sorry, I now saw that this thread diverged. My mail client didn't pick
it up because someone messed up the subject of the thread.

On 16.06.20 14:06, Aljoscha Krettek wrote:

> Hi,
>
> what is the timescale of your data in Kafka. If you have data in there
> that spans more than ~30 minutes I would expect your windows to fire
> very soon after the job is started. Event time does not depend on a wall
> clock but instead advances with the time in the stream. As Flink
> advances through the data in Kafka so does event-time advance in step.
>
> Does that explain your situation?
>
> Best,
> Aljoscha
>
> On 15.06.20 16:49, Ori Popowski wrote:
>> I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
>> gap of 30 minutes.
>>
>> But as soon as I start the job, events are written to the sink (I can see
>> them in S3) even though 30 minutes have not passed.
>>
>> This is my job:
>>
>> val stream = senv
>>        .addSource(new FlinkKafkaConsumer("…",
>> compressedEventDeserializer,
>> properties))
>>        .filter(_.sessionId.nonEmpty)
>>        .flatMap(_ match { case (_, events) => events })
>>        .assignTimestampsAndWatermarks(new
>> TimestampExtractor[Event](Time.minutes(10)) {
>>          override def extractTimestamp(element: Event): Long =
>> event.sequence / 1000 // microseconds
>>        })
>>        .keyBy(_.sessionId)
>>        .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
>>        .process(myProcessWindowFunction)
>>
>> AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)
>>
>> Any idea why it's happening?
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

Aljoscha Krettek
In reply to this post by orips
Did you look at the watermark metrics? Do you know what the current
watermark is when the windows are firing. You could also get the current
watemark when using a ProcessWindowFunction and also emit that in the
records that you're printing, for debugging.

What is that TimestampAssigner you're using for your timestamp
assigner/watermark extractor?

Best,
Aljoscha

On 16.06.20 14:10, Ori Popowski wrote:

> Okay, so I created a simple stream (similar to the original stream), where
> I just write the timestamps of each evaluated window to S3.
> The session gap is 30 minutes, and this is one of the sessions:
> (first-event, last-event, num-events)
>
> 11:23-11:23 11 events
> 11:25-11:26 51 events
> 11:28-11:29 74 events
> 11:31-11:31 13 events
>
> Again, this is one session. How can we explain this? Why does Flink create
> 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
> some help.
>
> On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski <[hidden email]> wrote:
>
>> Hi, thanks for answering.
>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>> historical data and Flink is catching-up.
>> Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
>> between partitions cannot explain it.
>> I think the only way it can happen is when when suddenly there's one event
>> with very late timestamp
>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>> events from the same session?
>> Good point. sessionId is unique in this case, and even if it's not - every
>> single session suffers from this problem of early triggering so it's very
>> unlikely that all millions sessions within that hour had duplicates.
>>
>> I'm suspecting that the fact I have two ProcessWindowFunctions one after
>> the other somehow causes this.
>> I deployed a version with one window function which just prints the
>> timestamps to S3 (to find out if I have event-time jumps) and suddenly it
>> doesn't trigger early (I'm running for 10 minutes and not a single event
>> has arrived to the sink)
>>
>> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch <[hidden email]> wrote:
>>
>>> Hi Ori,
>>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>>> historical data and Flink is catching-up.
>>>
>>> Regarding: *My event-time timestamps also do not have big gaps*
>>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>>> events from the same session?
>>>
>>> Rafi
>>>
>>>
>>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
>>>
>>>> So why is it happening? I have no clue at the moment.
>>>> My event-time timestamps also do not have big gaps between them that
>>>> would explain the window triggering.
>>>>
>>>>
>>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]>
>>>> wrote:
>>>>
>>>>> If you are using event time in Flink, it is disconnected from the real
>>>>> world wall clock time.
>>>>> You can process historical data in a streaming program as if it was
>>>>> real-time data (potentially reading through (event time) years of data in a
>>>>> few (wall clock) minutes)
>>>>>
>>>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I think it maybe you use the event time, and the timestamp between
>>>>>> your event data is bigger than 30minutes, maybe you can check the source
>>>>>> data timestamp.
>>>>>>
>>>>>> Best,
>>>>>> Yichao Yang
>>>>>>
>>>>>> ------------------------------
>>>>>> 发自我的iPhone
>>>>>>
>>>>>>
>>>>>> ------------------ Original ------------------
>>>>>> *From:* Ori Popowski <[hidden email]>
>>>>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>>>>> *To:* user <[hidden email]>
>>>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>>>>
>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

orips
Hi @aljoscha

The watermark metrics look fine. (attached screenshot)
image.png

This is the extractor:
class TimestampExtractor[A, B <: AbstractEvent] extends BoundedOutOfOrdernessTimestampExtractor[(A, B)](Time.minutes(5)) {
  override def extractTimestamp(element: (A, B)): Long =
    Instant.now.toEpochMilli.min(element._2.sequence / 1000)
}

I'll try to output the watermark and report my findings

On Tue, Jun 16, 2020 at 3:21 PM Aljoscha Krettek <[hidden email]> wrote:
Did you look at the watermark metrics? Do you know what the current
watermark is when the windows are firing. You could also get the current
watemark when using a ProcessWindowFunction and also emit that in the
records that you're printing, for debugging.

What is that TimestampAssigner you're using for your timestamp
assigner/watermark extractor?

Best,
Aljoscha

On 16.06.20 14:10, Ori Popowski wrote:
> Okay, so I created a simple stream (similar to the original stream), where
> I just write the timestamps of each evaluated window to S3.
> The session gap is 30 minutes, and this is one of the sessions:
> (first-event, last-event, num-events)
>
> 11:23-11:23 11 events
> 11:25-11:26 51 events
> 11:28-11:29 74 events
> 11:31-11:31 13 events
>
> Again, this is one session. How can we explain this? Why does Flink create
> 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
> some help.
>
> On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski <[hidden email]> wrote:
>
>> Hi, thanks for answering.
>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>> historical data and Flink is catching-up.
>> Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
>> between partitions cannot explain it.
>> I think the only way it can happen is when when suddenly there's one event
>> with very late timestamp
>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>> events from the same session?
>> Good point. sessionId is unique in this case, and even if it's not - every
>> single session suffers from this problem of early triggering so it's very
>> unlikely that all millions sessions within that hour had duplicates.
>>
>> I'm suspecting that the fact I have two ProcessWindowFunctions one after
>> the other somehow causes this.
>> I deployed a version with one window function which just prints the
>> timestamps to S3 (to find out if I have event-time jumps) and suddenly it
>> doesn't trigger early (I'm running for 10 minutes and not a single event
>> has arrived to the sink)
>>
>> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch <[hidden email]> wrote:
>>
>>> Hi Ori,
>>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>>> historical data and Flink is catching-up.
>>>
>>> Regarding: *My event-time timestamps also do not have big gaps*
>>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>>> events from the same session?
>>>
>>> Rafi
>>>
>>>
>>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
>>>
>>>> So why is it happening? I have no clue at the moment.
>>>> My event-time timestamps also do not have big gaps between them that
>>>> would explain the window triggering.
>>>>
>>>>
>>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]>
>>>> wrote:
>>>>
>>>>> If you are using event time in Flink, it is disconnected from the real
>>>>> world wall clock time.
>>>>> You can process historical data in a streaming program as if it was
>>>>> real-time data (potentially reading through (event time) years of data in a
>>>>> few (wall clock) minutes)
>>>>>
>>>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I think it maybe you use the event time, and the timestamp between
>>>>>> your event data is bigger than 30minutes, maybe you can check the source
>>>>>> data timestamp.
>>>>>>
>>>>>> Best,
>>>>>> Yichao Yang
>>>>>>
>>>>>> ------------------------------
>>>>>> 发自我的iPhone
>>>>>>
>>>>>>
>>>>>> ------------------ Original ------------------
>>>>>> *From:* Ori Popowski <[hidden email]>
>>>>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>>>>> *To:* user <[hidden email]>
>>>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>>>>
>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

orips
Hi, these are my findings with the watermark. This is one of the sessions, which had 3 windows within 10 minutes:

watermark: 1:20 first-ts: 12:49, last-ts: 12:49
watermark: 1:23 first-ts: 12:50, last-ts: 12:50
watermark: 1:31 first-ts: 12:59, last-ts: 12:59

This is the job graph of said session:
image.png


On Tue, Jun 16, 2020 at 4:52 PM Ori Popowski <[hidden email]> wrote:
Hi @aljoscha

The watermark metrics look fine. (attached screenshot)
image.png

This is the extractor:
class TimestampExtractor[A, B <: AbstractEvent] extends BoundedOutOfOrdernessTimestampExtractor[(A, B)](Time.minutes(5)) {
  override def extractTimestamp(element: (A, B)): Long =
    Instant.now.toEpochMilli.min(element._2.sequence / 1000)
}

I'll try to output the watermark and report my findings

On Tue, Jun 16, 2020 at 3:21 PM Aljoscha Krettek <[hidden email]> wrote:
Did you look at the watermark metrics? Do you know what the current
watermark is when the windows are firing. You could also get the current
watemark when using a ProcessWindowFunction and also emit that in the
records that you're printing, for debugging.

What is that TimestampAssigner you're using for your timestamp
assigner/watermark extractor?

Best,
Aljoscha

On 16.06.20 14:10, Ori Popowski wrote:
> Okay, so I created a simple stream (similar to the original stream), where
> I just write the timestamps of each evaluated window to S3.
> The session gap is 30 minutes, and this is one of the sessions:
> (first-event, last-event, num-events)
>
> 11:23-11:23 11 events
> 11:25-11:26 51 events
> 11:28-11:29 74 events
> 11:31-11:31 13 events
>
> Again, this is one session. How can we explain this? Why does Flink create
> 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
> some help.
>
> On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski <[hidden email]> wrote:
>
>> Hi, thanks for answering.
>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>> historical data and Flink is catching-up.
>> Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
>> between partitions cannot explain it.
>> I think the only way it can happen is when when suddenly there's one event
>> with very late timestamp
>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>> events from the same session?
>> Good point. sessionId is unique in this case, and even if it's not - every
>> single session suffers from this problem of early triggering so it's very
>> unlikely that all millions sessions within that hour had duplicates.
>>
>> I'm suspecting that the fact I have two ProcessWindowFunctions one after
>> the other somehow causes this.
>> I deployed a version with one window function which just prints the
>> timestamps to S3 (to find out if I have event-time jumps) and suddenly it
>> doesn't trigger early (I'm running for 10 minutes and not a single event
>> has arrived to the sink)
>>
>> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch <[hidden email]> wrote:
>>
>>> Hi Ori,
>>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>>> historical data and Flink is catching-up.
>>>
>>> Regarding: *My event-time timestamps also do not have big gaps*
>>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>>> events from the same session?
>>>
>>> Rafi
>>>
>>>
>>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
>>>
>>>> So why is it happening? I have no clue at the moment.
>>>> My event-time timestamps also do not have big gaps between them that
>>>> would explain the window triggering.
>>>>
>>>>
>>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]>
>>>> wrote:
>>>>
>>>>> If you are using event time in Flink, it is disconnected from the real
>>>>> world wall clock time.
>>>>> You can process historical data in a streaming program as if it was
>>>>> real-time data (potentially reading through (event time) years of data in a
>>>>> few (wall clock) minutes)
>>>>>
>>>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I think it maybe you use the event time, and the timestamp between
>>>>>> your event data is bigger than 30minutes, maybe you can check the source
>>>>>> data timestamp.
>>>>>>
>>>>>> Best,
>>>>>> Yichao Yang
>>>>>>
>>>>>> ------------------------------
>>>>>> 发自我的iPhone
>>>>>>
>>>>>>
>>>>>> ------------------ Original ------------------
>>>>>> *From:* Ori Popowski <[hidden email]>
>>>>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>>>>> *To:* user <[hidden email]>
>>>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>>>>
>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: EventTimeSessionWindow firing too soon

orips
Now, I'm seeing this. The watermark is 30 min ahead of those events.
I guess your initial response about data spanning more than 30 minutes in Kafka is the reason.

What's the solution for this though? If I'm reading historically from Kafka and some partitions are seeing newer data then such a problem will always happen. Is there any way to mitigate this?

On Tue, Jun 16, 2020 at 5:16 PM Ori Popowski <[hidden email]> wrote:
Hi, these are my findings with the watermark. This is one of the sessions, which had 3 windows within 10 minutes:

watermark: 1:20 first-ts: 12:49, last-ts: 12:49
watermark: 1:23 first-ts: 12:50, last-ts: 12:50
watermark: 1:31 first-ts: 12:59, last-ts: 12:59

This is the job graph of said session:
image.png


On Tue, Jun 16, 2020 at 4:52 PM Ori Popowski <[hidden email]> wrote:
Hi @aljoscha

The watermark metrics look fine. (attached screenshot)
image.png

This is the extractor:
class TimestampExtractor[A, B <: AbstractEvent] extends BoundedOutOfOrdernessTimestampExtractor[(A, B)](Time.minutes(5)) {
  override def extractTimestamp(element: (A, B)): Long =
    Instant.now.toEpochMilli.min(element._2.sequence / 1000)
}

I'll try to output the watermark and report my findings

On Tue, Jun 16, 2020 at 3:21 PM Aljoscha Krettek <[hidden email]> wrote:
Did you look at the watermark metrics? Do you know what the current
watermark is when the windows are firing. You could also get the current
watemark when using a ProcessWindowFunction and also emit that in the
records that you're printing, for debugging.

What is that TimestampAssigner you're using for your timestamp
assigner/watermark extractor?

Best,
Aljoscha

On 16.06.20 14:10, Ori Popowski wrote:
> Okay, so I created a simple stream (similar to the original stream), where
> I just write the timestamps of each evaluated window to S3.
> The session gap is 30 minutes, and this is one of the sessions:
> (first-event, last-event, num-events)
>
> 11:23-11:23 11 events
> 11:25-11:26 51 events
> 11:28-11:29 74 events
> 11:31-11:31 13 events
>
> Again, this is one session. How can we explain this? Why does Flink create
> 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
> some help.
>
> On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski <[hidden email]> wrote:
>
>> Hi, thanks for answering.
>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>> historical data and Flink is catching-up.
>> Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
>> between partitions cannot explain it.
>> I think the only way it can happen is when when suddenly there's one event
>> with very late timestamp
>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>> events from the same session?
>> Good point. sessionId is unique in this case, and even if it's not - every
>> single session suffers from this problem of early triggering so it's very
>> unlikely that all millions sessions within that hour had duplicates.
>>
>> I'm suspecting that the fact I have two ProcessWindowFunctions one after
>> the other somehow causes this.
>> I deployed a version with one window function which just prints the
>> timestamps to S3 (to find out if I have event-time jumps) and suddenly it
>> doesn't trigger early (I'm running for 10 minutes and not a single event
>> has arrived to the sink)
>>
>> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch <[hidden email]> wrote:
>>
>>> Hi Ori,
>>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>>> historical data and Flink is catching-up.
>>>
>>> Regarding: *My event-time timestamps also do not have big gaps*
>>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>>> events from the same session?
>>>
>>> Rafi
>>>
>>>
>>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
>>>
>>>> So why is it happening? I have no clue at the moment.
>>>> My event-time timestamps also do not have big gaps between them that
>>>> would explain the window triggering.
>>>>
>>>>
>>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]>
>>>> wrote:
>>>>
>>>>> If you are using event time in Flink, it is disconnected from the real
>>>>> world wall clock time.
>>>>> You can process historical data in a streaming program as if it was
>>>>> real-time data (potentially reading through (event time) years of data in a
>>>>> few (wall clock) minutes)
>>>>>
>>>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I think it maybe you use the event time, and the timestamp between
>>>>>> your event data is bigger than 30minutes, maybe you can check the source
>>>>>> data timestamp.
>>>>>>
>>>>>> Best,
>>>>>> Yichao Yang
>>>>>>
>>>>>> ------------------------------
>>>>>> 发自我的iPhone
>>>>>>
>>>>>>
>>>>>> ------------------ Original ------------------
>>>>>> *From:* Ori Popowski <[hidden email]>
>>>>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>>>>> *To:* user <[hidden email]>
>>>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>>>>
>>>>>>
>