Is it possible that late events are processed before the window?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Is it possible that late events are processed before the window?

orips

I have a job with event-time session window of 30 minutes.

I output late events to side output, where I have a tumbling processing time window of 30 minutes.

I observe that the late events are written to storage before the "main" events.

I wanted to know if it's normal before digging into the code and debugging the problem.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible that late events are processed before the window?

orips
I've made an experiment where I use an evictor on the main window (not the late one), only to write a debug file when the window fires (I don't actually evict events, I've made it so I can write a debug object the moment the window finishes).

I can see that indeed the late data window fires before the main window, since the mentioned debug file does not exist, but late events _do_ exist in the destination.

Writing this debug object in the evictor eliminates potential problems that might be due to logic in the process function, and it proves that the window of the late events indeed fires before the main window.

Here's an outline of my job:

val windowedStream = senv
  .addSource(kafkaSource)
  ... // some operators
  // like BoundedOutOfOrdereness but ignore future timestamps
  .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
  ... // some more operators
  .keyingBy { case (meta, _) => meta.toPath }
  .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main" window
  .sideOutputLateData(lateDataTag)
  .process(new ProcessSession(sessionPlayback, config))
windowedStream
  .map(new SerializeSession(sessionPlayback))
  .addSink(sink)
windowedStream
  .getSideOutput(lateDataTag)
  .keyingBy { case (meta, _) => meta.toPath }
  .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late" window
  .process(new ProcessSession(sessionPlayback, config, true))
  .map(new SerializeSession(sessionPlayback, late = true))

So, to repeat the question, is that normal? And if not - how can I fix this?

Thanks

On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <[hidden email]> wrote:

I have a job with event-time session window of 30 minutes.

I output late events to side output, where I have a tumbling processing time window of 30 minutes.

I observe that the late events are written to storage before the "main" events.

I wanted to know if it's normal before digging into the code and debugging the problem.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible that late events are processed before the window?

orips
After creating a toy example I think that I've got the concept of lateDataOutput wrong.

It seems that the lateDataSideOutput has nothing to do with windowing; when events arrive late they'll just go straight to the side output, and there can never be any window firing of the main flow for that specific key.

On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <[hidden email]> wrote:
I've made an experiment where I use an evictor on the main window (not the late one), only to write a debug file when the window fires (I don't actually evict events, I've made it so I can write a debug object the moment the window finishes).

I can see that indeed the late data window fires before the main window, since the mentioned debug file does not exist, but late events _do_ exist in the destination.

Writing this debug object in the evictor eliminates potential problems that might be due to logic in the process function, and it proves that the window of the late events indeed fires before the main window.

Here's an outline of my job:

val windowedStream = senv
  .addSource(kafkaSource)
  ... // some operators
  // like BoundedOutOfOrdereness but ignore future timestamps
  .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
  ... // some more operators
  .keyingBy { case (meta, _) => meta.toPath }
  .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main" window
  .sideOutputLateData(lateDataTag)
  .process(new ProcessSession(sessionPlayback, config))
windowedStream
  .map(new SerializeSession(sessionPlayback))
  .addSink(sink)
windowedStream
  .getSideOutput(lateDataTag)
  .keyingBy { case (meta, _) => meta.toPath }
  .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late" window
  .process(new ProcessSession(sessionPlayback, config, true))
  .map(new SerializeSession(sessionPlayback, late = true))

So, to repeat the question, is that normal? And if not - how can I fix this?

Thanks

On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <[hidden email]> wrote:

I have a job with event-time session window of 30 minutes.

I output late events to side output, where I have a tumbling processing time window of 30 minutes.

I observe that the late events are written to storage before the "main" events.

I wanted to know if it's normal before digging into the code and debugging the problem.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible that late events are processed before the window?

Till Rohrmann
Hi Ori,

you are right. Events are being sent down the side output for late events if the event's timestamp + the allowed lateness is smaller than the current watermark. These events are directly seen by downstream operators which consume the side output for late events.

Cheers,
Till

On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski <[hidden email]> wrote:
After creating a toy example I think that I've got the concept of lateDataOutput wrong.

It seems that the lateDataSideOutput has nothing to do with windowing; when events arrive late they'll just go straight to the side output, and there can never be any window firing of the main flow for that specific key.

On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <[hidden email]> wrote:
I've made an experiment where I use an evictor on the main window (not the late one), only to write a debug file when the window fires (I don't actually evict events, I've made it so I can write a debug object the moment the window finishes).

I can see that indeed the late data window fires before the main window, since the mentioned debug file does not exist, but late events _do_ exist in the destination.

Writing this debug object in the evictor eliminates potential problems that might be due to logic in the process function, and it proves that the window of the late events indeed fires before the main window.

Here's an outline of my job:

val windowedStream = senv
  .addSource(kafkaSource)
  ... // some operators
  // like BoundedOutOfOrdereness but ignore future timestamps
  .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
  ... // some more operators
  .keyingBy { case (meta, _) => meta.toPath }
  .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main" window
  .sideOutputLateData(lateDataTag)
  .process(new ProcessSession(sessionPlayback, config))
windowedStream
  .map(new SerializeSession(sessionPlayback))
  .addSink(sink)
windowedStream
  .getSideOutput(lateDataTag)
  .keyingBy { case (meta, _) => meta.toPath }
  .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late" window
  .process(new ProcessSession(sessionPlayback, config, true))
  .map(new SerializeSession(sessionPlayback, late = true))

So, to repeat the question, is that normal? And if not - how can I fix this?

Thanks

On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <[hidden email]> wrote:

I have a job with event-time session window of 30 minutes.

I output late events to side output, where I have a tumbling processing time window of 30 minutes.

I observe that the late events are written to storage before the "main" events.

I wanted to know if it's normal before digging into the code and debugging the problem.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible that late events are processed before the window?

orips
Thanks

On Wed, Oct 7, 2020 at 7:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Ori,

you are right. Events are being sent down the side output for late events if the event's timestamp + the allowed lateness is smaller than the current watermark. These events are directly seen by downstream operators which consume the side output for late events.

Cheers,
Till

On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski <[hidden email]> wrote:
After creating a toy example I think that I've got the concept of lateDataOutput wrong.

It seems that the lateDataSideOutput has nothing to do with windowing; when events arrive late they'll just go straight to the side output, and there can never be any window firing of the main flow for that specific key.

On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <[hidden email]> wrote:
I've made an experiment where I use an evictor on the main window (not the late one), only to write a debug file when the window fires (I don't actually evict events, I've made it so I can write a debug object the moment the window finishes).

I can see that indeed the late data window fires before the main window, since the mentioned debug file does not exist, but late events _do_ exist in the destination.

Writing this debug object in the evictor eliminates potential problems that might be due to logic in the process function, and it proves that the window of the late events indeed fires before the main window.

Here's an outline of my job:

val windowedStream = senv
  .addSource(kafkaSource)
  ... // some operators
  // like BoundedOutOfOrdereness but ignore future timestamps
  .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
  ... // some more operators
  .keyingBy { case (meta, _) => meta.toPath }
  .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main" window
  .sideOutputLateData(lateDataTag)
  .process(new ProcessSession(sessionPlayback, config))
windowedStream
  .map(new SerializeSession(sessionPlayback))
  .addSink(sink)
windowedStream
  .getSideOutput(lateDataTag)
  .keyingBy { case (meta, _) => meta.toPath }
  .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late" window
  .process(new ProcessSession(sessionPlayback, config, true))
  .map(new SerializeSession(sessionPlayback, late = true))

So, to repeat the question, is that normal? And if not - how can I fix this?

Thanks

On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <[hidden email]> wrote:

I have a job with event-time session window of 30 minutes.

I output late events to side output, where I have a tumbling processing time window of 30 minutes.

I observe that the late events are written to storage before the "main" events.

I wanted to know if it's normal before digging into the code and debugging the problem.

Thanks