Windows and Watermarks Clarification

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Windows and Watermarks Clarification

Paul Joireman

Hi all,


Just a point of clarification on how watermarks are generated.   I'd like to use a SlidingEventTime window of say 5 minutes with a 30 second slide.  The incoming data stream has elements from which I can extract the timestamp but they may come out of order so I chose to implement the following timestamp assigner.  


     my_stream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(final MyElement element) {
              return element.getTimestamp();
          }
  });

With this definition and the code for BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each incoming element a watermark will be generated that is 10 seconds behind the current timestamp.    If any the end time of any of the sliding windows is earlier that an emitted watermark that (or those) windows will fire initiating a processing on the window(s).   Is this correct?

Paul

Reply | Threaded
Open this post in threaded view
|

Re: Windows and Watermarks Clarification

Fabian Hueske-2
Hi Paul,

BoundedOutOfOrdernessTimestampExtractor implements the AssignerWithPeriodicWatermarks interface.
This means, Flink will ask the assigner in regular intervals (configurable via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for the current watermark.
The watermark will be 10secs earlier than the highest observed timestamp so far.

An event-time window is evaluated when the current watermark is higher / later than the window's end time. With allowedLateness() the window evaluation can be deferred to allow late elements (elements whose timestamp is before the current watermark) to join the window before it is evaluated.

Let me know if you have further questions,
Fabian


2016-09-01 20:16 GMT+02:00 Paul Joireman <[hidden email]>:

Hi all,


Just a point of clarification on how watermarks are generated.   I'd like to use a SlidingEventTime window of say 5 minutes with a 30 second slide.  The incoming data stream has elements from which I can extract the timestamp but they may come out of order so I chose to implement the following timestamp assigner.  


     my_stream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(final MyElement element) {
              return element.getTimestamp();
          }
  });

With this definition and the code for BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each incoming element a watermark will be generated that is 10 seconds behind the current timestamp.    If any the end time of any of the sliding windows is earlier that an emitted watermark that (or those) windows will fire initiating a processing on the window(s).   Is this correct?

Paul


Reply | Threaded
Open this post in threaded view
|

Re: Windows and Watermarks Clarification

Paul Joireman

Thanks Fabian,


This is making more sense.  Is allowedLateness(Time.seconds(x)) then evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime - lastWaterMarkTime) > x * 1000 then the window is evaluated?


Paul


From: Fabian Hueske <[hidden email]>
Sent: Thursday, September 1, 2016 1:25:55 PM
To: [hidden email]
Subject: Re: Windows and Watermarks Clarification
 
Hi Paul,

BoundedOutOfOrdernessTimestampExtractor implements the AssignerWithPeriodicWatermarks interface.
This means, Flink will ask the assigner in regular intervals (configurable via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for the current watermark.
The watermark will be 10secs earlier than the highest observed timestamp so far.

An event-time window is evaluated when the current watermark is higher / later than the window's end time. With allowedLateness() the window evaluation can be deferred to allow late elements (elements whose timestamp is before the current watermark) to join the window before it is evaluated.

Let me know if you have further questions,
Fabian


2016-09-01 20:16 GMT+02:00 Paul Joireman <[hidden email]>:

Hi all,


Just a point of clarification on how watermarks are generated.   I'd like to use a SlidingEventTime window of say 5 minutes with a 30 second slide.  The incoming data stream has elements from which I can extract the timestamp but they may come out of order so I chose to implement the following timestamp assigner.  


     my_stream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(final MyElement element) {
              return element.getTimestamp();
          }
  });

With this definition and the code for BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each incoming element a watermark will be generated that is 10 seconds behind the current timestamp.    If any the end time of any of the sliding windows is earlier that an emitted watermark that (or those) windows will fire initiating a processing on the window(s).   Is this correct?

Paul


Reply | Threaded
Open this post in threaded view
|

Re: Windows and Watermarks Clarification

Fabian Hueske-2
A 10 minute tumbling window that starts at 12:00 is evaluated after a watermark is observed that is > 12:10.
If the same tumbling window has an allowed lateness of 5 minuted, it is evaluated once a watermark > 12:15 is observed. However, only elements with timestamps 12:00 <= x < 12:10 are in the window.
Elements that arrive even after the allowed lateness period are simply dropped.

Best, Fabian

2016-09-01 20:42 GMT+02:00 Paul Joireman <[hidden email]>:

Thanks Fabian,


This is making more sense.  Is allowedLateness(Time.seconds(x)) then evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime - lastWaterMarkTime) > x * 1000 then the window is evaluated?


Paul


From: Fabian Hueske <[hidden email]>
Sent: Thursday, September 1, 2016 1:25:55 PM
To: [hidden email]
Subject: Re: Windows and Watermarks Clarification
 
Hi Paul,

BoundedOutOfOrdernessTimestampExtractor implements the AssignerWithPeriodicWatermarks interface.
This means, Flink will ask the assigner in regular intervals (configurable via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for the current watermark.
The watermark will be 10secs earlier than the highest observed timestamp so far.

An event-time window is evaluated when the current watermark is higher / later than the window's end time. With allowedLateness() the window evaluation can be deferred to allow late elements (elements whose timestamp is before the current watermark) to join the window before it is evaluated.

Let me know if you have further questions,
Fabian


2016-09-01 20:16 GMT+02:00 Paul Joireman <[hidden email]>:

Hi all,


Just a point of clarification on how watermarks are generated.   I'd like to use a SlidingEventTime window of say 5 minutes with a 30 second slide.  The incoming data stream has elements from which I can extract the timestamp but they may come out of order so I chose to implement the following timestamp assigner.  


     my_stream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(final MyElement element) {
              return element.getTimestamp();
          }
  });

With this definition and the code for BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each incoming element a watermark will be generated that is 10 seconds behind the current timestamp.    If any the end time of any of the sliding windows is earlier that an emitted watermark that (or those) windows will fire initiating a processing on the window(s).   Is this correct?

Paul



Reply | Threaded
Open this post in threaded view
|

Re: Windows and Watermarks Clarification

Aljoscha Krettek
Just one clarification: even with a specified allowed lateness the window will still be evaluated once the watermark passes the end of the window. It's just that with allowed lateness the window contents and state will be kept around a bit longer to allow eventual late elements to update the results. What happens when late elements arrive depends on the trigger. With the default EventTimeTrigger you will get a new firing that processes the previously available elements along with the new (late-arriving) element.

Cheers,
Aljoscha

On Thu, 1 Sep 2016 at 21:15 Fabian Hueske <[hidden email]> wrote:
A 10 minute tumbling window that starts at 12:00 is evaluated after a watermark is observed that is > 12:10.
If the same tumbling window has an allowed lateness of 5 minuted, it is evaluated once a watermark > 12:15 is observed. However, only elements with timestamps 12:00 <= x < 12:10 are in the window.
Elements that arrive even after the allowed lateness period are simply dropped.

Best, Fabian

2016-09-01 20:42 GMT+02:00 Paul Joireman <[hidden email]>:

Thanks Fabian,


This is making more sense.  Is allowedLateness(Time.seconds(x)) then evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime - lastWaterMarkTime) > x * 1000 then the window is evaluated?


Paul


From: Fabian Hueske <[hidden email]>
Sent: Thursday, September 1, 2016 1:25:55 PM
To: [hidden email]
Subject: Re: Windows and Watermarks Clarification
 
Hi Paul,

BoundedOutOfOrdernessTimestampExtractor implements the AssignerWithPeriodicWatermarks interface.
This means, Flink will ask the assigner in regular intervals (configurable via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for the current watermark.
The watermark will be 10secs earlier than the highest observed timestamp so far.

An event-time window is evaluated when the current watermark is higher / later than the window's end time. With allowedLateness() the window evaluation can be deferred to allow late elements (elements whose timestamp is before the current watermark) to join the window before it is evaluated.

Let me know if you have further questions,
Fabian


2016-09-01 20:16 GMT+02:00 Paul Joireman <[hidden email]>:

Hi all,


Just a point of clarification on how watermarks are generated.   I'd like to use a SlidingEventTime window of say 5 minutes with a 30 second slide.  The incoming data stream has elements from which I can extract the timestamp but they may come out of order so I chose to implement the following timestamp assigner.  


     my_stream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(final MyElement element) {
              return element.getTimestamp();
          }
  });

With this definition and the code for BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each incoming element a watermark will be generated that is 10 seconds behind the current timestamp.    If any the end time of any of the sliding windows is earlier that an emitted watermark that (or those) windows will fire initiating a processing on the window(s).   Is this correct?

Paul