watermark VS window trigger

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

watermark VS window trigger

Soheil Pourbafrani
Suppose we have a time window of 10 milliseconds and we use EventTime.
First, we determine how Flink can get time and watermark from incoming messages, after that, we set a key for the stream and set a time window.
aggregatedTuple
.assignTimestampsAndWatermarks(new SampleTimestampExtractor())
.keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
.reduce()
My understanding of the data flow in this scenario is the following:

Flink advanced time according to the timestamp of Incoming data into the aggregatedTuple variable while for each message get the timestamp and watermark.
As I use Periodic Watermarks, according to default watermark interval (200ms), watermarks will be updated. After that according to the current watermark, data with the timestamp between the last watermark and current watermark will be released and go to the next steps (keyBy, timeWindow, reduce). If Flink received a data but an appropriate watermark didn't emit for that data yet, Flink didn't send that data to the next steps and keep it until it's appropriate watermark will be emitted.

Is that correct?
Reply | Threaded
Open this post in threaded view
|

Re: watermark VS window trigger

vino yang
Hi Soheil,

I feel that some of your understanding is a bit problematic. 

"After that according to the current watermark, data with the timestamp between the last watermark and current watermark will be released and go to the next steps"

The main role of Watermark here is to define the progress of the event time, which will serve as the time base for the window to trigger. Before the time window, the upstream will only generate a Watermark according to a specific cycle, and then raise the Watermark of the downstream task while flowing downstream.

You can read "event time & watermark" documentation [1].


Thanks, vino.


2018-07-30 17:05 GMT+08:00 Soheil Pourbafrani <[hidden email]>:
Suppose we have a time window of 10 milliseconds and we use EventTime.
First, we determine how Flink can get time and watermark from incoming messages, after that, we set a key for the stream and set a time window.
aggregatedTuple
.assignTimestampsAndWatermarks(new SampleTimestampExtractor())
.keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
.reduce()
My understanding of the data flow in this scenario is the following:

Flink advanced time according to the timestamp of Incoming data into the aggregatedTuple variable while for each message get the timestamp and watermark.
As I use Periodic Watermarks, according to default watermark interval (200ms), watermarks will be updated. After that according to the current watermark, data with the timestamp between the last watermark and current watermark will be released and go to the next steps (keyBy, timeWindow, reduce). If Flink received a data but an appropriate watermark didn't emit for that data yet, Flink didn't send that data to the next steps and keep it until it's appropriate watermark will be emitted.

Is that correct?

Reply | Threaded
Open this post in threaded view
|

Re: watermark VS window trigger

Fabian Hueske-2
Hi,

Watermarks are not holding back records. Instead they define the event-time at an operator (as Vino said) and can trigger the processing of data if the logic of an operator is based on time.
For example, a window operator can emit complete results for a window once the time passed the window's end timestamp.
Operators that do not act on time, such as mappers or filters, emit records at as soon as possible without waiting for watermarks.

Best, Fabian

2018-07-30 11:37 GMT+02:00 vino yang <[hidden email]>:
Hi Soheil,

I feel that some of your understanding is a bit problematic. 

"After that according to the current watermark, data with the timestamp between the last watermark and current watermark will be released and go to the next steps"

The main role of Watermark here is to define the progress of the event time, which will serve as the time base for the window to trigger. Before the time window, the upstream will only generate a Watermark according to a specific cycle, and then raise the Watermark of the downstream task while flowing downstream.

You can read "event time & watermark" documentation [1].


Thanks, vino.


2018-07-30 17:05 GMT+08:00 Soheil Pourbafrani <[hidden email]>:
Suppose we have a time window of 10 milliseconds and we use EventTime.
First, we determine how Flink can get time and watermark from incoming messages, after that, we set a key for the stream and set a time window.
aggregatedTuple
.assignTimestampsAndWatermarks(new SampleTimestampExtractor())
.keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
.reduce()
My understanding of the data flow in this scenario is the following:

Flink advanced time according to the timestamp of Incoming data into the aggregatedTuple variable while for each message get the timestamp and watermark.
As I use Periodic Watermarks, according to default watermark interval (200ms), watermarks will be updated. After that according to the current watermark, data with the timestamp between the last watermark and current watermark will be released and go to the next steps (keyBy, timeWindow, reduce). If Flink received a data but an appropriate watermark didn't emit for that data yet, Flink didn't send that data to the next steps and keep it until it's appropriate watermark will be emitted.

Is that correct?