Re: EventTimeSessionWindow firing too soon

Posted by orips on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/EventTimeSessionWindow-firing-too-soon-tp35959p36001.html

Hi, these are my findings with the watermark. This is one of the sessions, which had 3 windows within 10 minutes:

watermark: 1:20 first-ts: 12:49, last-ts: 12:49
watermark: 1:23 first-ts: 12:50, last-ts: 12:50
watermark: 1:31 first-ts: 12:59, last-ts: 12:59

This is the job graph of said session:
image.png


On Tue, Jun 16, 2020 at 4:52 PM Ori Popowski <[hidden email]> wrote:
Hi @aljoscha

The watermark metrics look fine. (attached screenshot)
image.png

This is the extractor:
class TimestampExtractor[A, B <: AbstractEvent] extends BoundedOutOfOrdernessTimestampExtractor[(A, B)](Time.minutes(5)) {
  override def extractTimestamp(element: (A, B)): Long =
    Instant.now.toEpochMilli.min(element._2.sequence / 1000)
}

I'll try to output the watermark and report my findings

On Tue, Jun 16, 2020 at 3:21 PM Aljoscha Krettek <[hidden email]> wrote:
Did you look at the watermark metrics? Do you know what the current
watermark is when the windows are firing. You could also get the current
watemark when using a ProcessWindowFunction and also emit that in the
records that you're printing, for debugging.

What is that TimestampAssigner you're using for your timestamp
assigner/watermark extractor?

Best,
Aljoscha

On 16.06.20 14:10, Ori Popowski wrote:
> Okay, so I created a simple stream (similar to the original stream), where
> I just write the timestamps of each evaluated window to S3.
> The session gap is 30 minutes, and this is one of the sessions:
> (first-event, last-event, num-events)
>
> 11:23-11:23 11 events
> 11:25-11:26 51 events
> 11:28-11:29 74 events
> 11:31-11:31 13 events
>
> Again, this is one session. How can we explain this? Why does Flink create
> 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
> some help.
>
> On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski <[hidden email]> wrote:
>
>> Hi, thanks for answering.
>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>> historical data and Flink is catching-up.
>> Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
>> between partitions cannot explain it.
>> I think the only way it can happen is when when suddenly there's one event
>> with very late timestamp
>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>> events from the same session?
>> Good point. sessionId is unique in this case, and even if it's not - every
>> single session suffers from this problem of early triggering so it's very
>> unlikely that all millions sessions within that hour had duplicates.
>>
>> I'm suspecting that the fact I have two ProcessWindowFunctions one after
>> the other somehow causes this.
>> I deployed a version with one window function which just prints the
>> timestamps to S3 (to find out if I have event-time jumps) and suddenly it
>> doesn't trigger early (I'm running for 10 minutes and not a single event
>> has arrived to the sink)
>>
>> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch <[hidden email]> wrote:
>>
>>> Hi Ori,
>>>
>>> I guess you consume from Kafka from the earliest offset, so you consume
>>> historical data and Flink is catching-up.
>>>
>>> Regarding: *My event-time timestamps also do not have big gaps*
>>>
>>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>>> events from the same session?
>>>
>>> Rafi
>>>
>>>
>>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski <[hidden email]> wrote:
>>>
>>>> So why is it happening? I have no clue at the moment.
>>>> My event-time timestamps also do not have big gaps between them that
>>>> would explain the window triggering.
>>>>
>>>>
>>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger <[hidden email]>
>>>> wrote:
>>>>
>>>>> If you are using event time in Flink, it is disconnected from the real
>>>>> world wall clock time.
>>>>> You can process historical data in a streaming program as if it was
>>>>> real-time data (potentially reading through (event time) years of data in a
>>>>> few (wall clock) minutes)
>>>>>
>>>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <[hidden email]> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I think it maybe you use the event time, and the timestamp between
>>>>>> your event data is bigger than 30minutes, maybe you can check the source
>>>>>> data timestamp.
>>>>>>
>>>>>> Best,
>>>>>> Yichao Yang
>>>>>>
>>>>>> ------------------------------
>>>>>> 发自我的iPhone
>>>>>>
>>>>>>
>>>>>> ------------------ Original ------------------
>>>>>> *From:* Ori Popowski <[hidden email]>
>>>>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>>>>> *To:* user <[hidden email]>
>>>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>>>>
>>>>>>
>