Interaction of watermarks and windows

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Interaction of watermarks and windows

Sergii Mikhtoniuk
Greetings,

When playing around with the following simple event-time stream aggregation:

      SELECT
        TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
        ...
      FROM input
      GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol

...to my surprise I found out that the tumbling window operator has no effect on the watermarks of the resulting append stream - the watermarks of the input stream are propagated as-is.

This seems to be a documented behavior https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows but it's still very counter-intuitive to me and I couldn't find any explanation of it.

My understanding of the watermarking is that MOST data is expected to arrive with event time below the stream's watermark. Late events are either discarded or should be handled as exceptional cases, e.g. via "allowed lateness".

So in my aggregation above I was expecting the result watermark to be offset by ~1 day from the input and be emitted only after a tumbling window closes. Instead, with input watermarks propagated as-is ALL events in the resulting stream end up being late in relation to the current watermark... Doesn't this behavior ruin the composition, as downstream operators will be discarding all late data?

I'd greatly appreciate if someone could shed light on this design decision.

Thanks,
Sergii
Reply | Threaded
Open this post in threaded view
|

Re: Interaction of watermarks and windows

Jark Wu-3
Hi Sergii,

Window operator won't affect/adjust the output watermark, it just propagated as-is which is said in the document.
I think the mistake here is you are using the wrong event-time of the window, actually, you should use TUMBLE_ROWTIME(...) as event_time [1].
The event-time of the window should be the maximal timestamp of the window, e.g. a window of [10:00, 11:00), the event-time of this window should be 10:59.999,  
not the start time. Because it indicates when this event happens, a window happens when the window is closed (the max timestamp).
That's how TUMBLE_ROWTIME calculated in Flink SQL.

Best,
Jark



On Mon, 22 Jun 2020 at 04:37, Sergii Mikhtoniuk <[hidden email]> wrote:
Greetings,

When playing around with the following simple event-time stream aggregation:

      SELECT
        TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
        ...
      FROM input
      GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol

...to my surprise I found out that the tumbling window operator has no effect on the watermarks of the resulting append stream - the watermarks of the input stream are propagated as-is.

This seems to be a documented behavior https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows but it's still very counter-intuitive to me and I couldn't find any explanation of it.

My understanding of the watermarking is that MOST data is expected to arrive with event time below the stream's watermark. Late events are either discarded or should be handled as exceptional cases, e.g. via "allowed lateness".

So in my aggregation above I was expecting the result watermark to be offset by ~1 day from the input and be emitted only after a tumbling window closes. Instead, with input watermarks propagated as-is ALL events in the resulting stream end up being late in relation to the current watermark... Doesn't this behavior ruin the composition, as downstream operators will be discarding all late data?

I'd greatly appreciate if someone could shed light on this design decision.

Thanks,
Sergii
Reply | Threaded
Open this post in threaded view
|

Re: Interaction of watermarks and windows

Benchao Li-2
In reply to this post by Sergii Mikhtoniuk
Hi Sergii,

The current watermark strategy is correct.

The window's output is drived by watermark. Before when the window is triggered, the watermark which triggers 
it will be emitted after the result of the window has been fully emitted. Hence, the watermark won't outpace the 
right margin of the window.

The question that bothers you is that you think TUMBLE_START will give you a event time column. Actually it's not.

The only way you can preserve the event time column property is using TUMBLE_ROWTIME(), and the value of 
TUMBLE_ROWTIME() is TUMBLE_END() - 1 (which is the right margin of the window). So you can see that 
the event time of all the data that emitted by the window won't be larger than watermark, which means that
it won't bring data dropping.

Hope it helps.

Sergii Mikhtoniuk <[hidden email]> 于2020年6月22日周一 上午4:37写道:
Greetings,

When playing around with the following simple event-time stream aggregation:

      SELECT
        TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
        ...
      FROM input
      GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol

...to my surprise I found out that the tumbling window operator has no effect on the watermarks of the resulting append stream - the watermarks of the input stream are propagated as-is.

This seems to be a documented behavior https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows but it's still very counter-intuitive to me and I couldn't find any explanation of it.

My understanding of the watermarking is that MOST data is expected to arrive with event time below the stream's watermark. Late events are either discarded or should be handled as exceptional cases, e.g. via "allowed lateness".

So in my aggregation above I was expecting the result watermark to be offset by ~1 day from the input and be emitted only after a tumbling window closes. Instead, with input watermarks propagated as-is ALL events in the resulting stream end up being late in relation to the current watermark... Doesn't this behavior ruin the composition, as downstream operators will be discarding all late data?

I'd greatly appreciate if someone could shed light on this design decision.

Thanks,
Sergii


--

Best,
Benchao Li