BoundedOutOfOrdernessTimestampExtractor and allowedlateness

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Yassine MARZOUGUI
Hi,

I'm a bit confused about how Flink deals with late elements after the introduction of allowedlateness to windows. What is the difference between using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and allowedlateness(Time.seconds(X))? What if one is used and the other is not? and what if a different lateness is used in each one? Could you please clarify it on basis of a simple example? Thank you.

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Fabian Hueske-2
Hi Yassine,

the difference is the following:

1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp extractor and watermark assigner.
A timestamp extractor tells Flink when an event happened, i.e., it extracts a timestamp from the event. A watermark assigner tells Flink what the current logical time is.
The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink asks what the current time is, it returns the latest observed timestamp minus the a configurable bound. This is the safety margin for late data.
 A record whose timestamp is lower than the last watermark is considered to be late.

2) The allowedLateness parameter of time windows tells Flink how long to keep state around after the window was evaluated.
If data arrives after the evaluation and before the allowedLateness has passed, the window function is applied again and an update is sent out.

Let's look at an example.
Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling window that starts at 12:00 and ends at 12:10:

If you have the following data:

12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 minutes
12:02, C
12:08, D
12:14, E
WM, 12:12
12:16, F
WM, 12:14 // 12:16 - 2 minutes
12:09, G

== no allowed lateness
The window operator forwards the logical time to 12:12 when it receives <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this time and finally purges its state. <12:09, G> is later ignored.

== allowed lateness of 3 minutes
The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. The state is purged when <WM, 12:14> is received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is again ignored.

== allowed lateness of 5 minutes
The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. When <12:09, G> is received, the window is again evaluated but this time with [A, B, C, D, G] and an update is sent out. The state is purged when a watermark of >=12:15 is received.

So, watermarks tell the Flink what time it is and allowed lateness tells the system when state should be discarded and all later arriving data be ignored.
These issue are related but not exactly the same thing. For instance you can counter late data by increasing the bound or the lateness parameter.
Increasing the watermark bound will yield higher latencies as windows are evaluated later.
Configuring allowedLateness will allow for earlier results, but you have to cope with the updates downstream.

Please let me know, if you have questions.

Best, Fabian


 

 



2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi,

I'm a bit confused about how Flink deals with late elements after the introduction of allowedlateness to windows. What is the difference between using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and allowedlateness(Time.seconds(X))? What if one is used and the other is not? and what if a different lateness is used in each one? Could you please clarify it on basis of a simple example? Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Fabian Hueske-2
I have to extend my answer:

The behavior allowedLateness that I described applies only if the window trigger calls FIRE when the window is evaluated (this is the default behavior of most triggers).

In case the trigger calls FIRE_AND_PURGE, the state of the window is purged when the function is evaluated and late events are processed alone, i.e., in my example <12:09, G> would be processed without [A, B, C, D].
When the allowed lateness is passed, all window state is purged regardless of the trigger.

Best, Fabian

2016-10-17 16:24 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

the difference is the following:

1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp extractor and watermark assigner.
A timestamp extractor tells Flink when an event happened, i.e., it extracts a timestamp from the event. A watermark assigner tells Flink what the current logical time is.
The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink asks what the current time is, it returns the latest observed timestamp minus the a configurable bound. This is the safety margin for late data.
 A record whose timestamp is lower than the last watermark is considered to be late.

2) The allowedLateness parameter of time windows tells Flink how long to keep state around after the window was evaluated.
If data arrives after the evaluation and before the allowedLateness has passed, the window function is applied again and an update is sent out.

Let's look at an example.
Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling window that starts at 12:00 and ends at 12:10:

If you have the following data:

12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 minutes
12:02, C
12:08, D
12:14, E
WM, 12:12
12:16, F
WM, 12:14 // 12:16 - 2 minutes
12:09, G

== no allowed lateness
The window operator forwards the logical time to 12:12 when it receives <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this time and finally purges its state. <12:09, G> is later ignored.

== allowed lateness of 3 minutes
The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. The state is purged when <WM, 12:14> is received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is again ignored.

== allowed lateness of 5 minutes
The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. When <12:09, G> is received, the window is again evaluated but this time with [A, B, C, D, G] and an update is sent out. The state is purged when a watermark of >=12:15 is received.

So, watermarks tell the Flink what time it is and allowed lateness tells the system when state should be discarded and all later arriving data be ignored.
These issue are related but not exactly the same thing. For instance you can counter late data by increasing the bound or the lateness parameter.
Increasing the watermark bound will yield higher latencies as windows are evaluated later.
Configuring allowedLateness will allow for earlier results, but you have to cope with the updates downstream.

Please let me know, if you have questions.

Best, Fabian


 

 



2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi,

I'm a bit confused about how Flink deals with late elements after the introduction of allowedlateness to windows. What is the difference between using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and allowedlateness(Time.seconds(X))? What if one is used and the other is not? and what if a different lateness is used in each one? Could you please clarify it on basis of a simple example? Thank you.

Best,
Yassine


Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Yassine MARZOUGUI
Hi Fabian,

Thank you very much for the great answer and example, I appreciate it!
It is all clear now.

Best,
Yassine

2016-10-17 16:29 GMT+02:00 Fabian Hueske <[hidden email]>:
I have to extend my answer:

The behavior allowedLateness that I described applies only if the window trigger calls FIRE when the window is evaluated (this is the default behavior of most triggers).

In case the trigger calls FIRE_AND_PURGE, the state of the window is purged when the function is evaluated and late events are processed alone, i.e., in my example <12:09, G> would be processed without [A, B, C, D].
When the allowed lateness is passed, all window state is purged regardless of the trigger.

Best, Fabian

2016-10-17 16:24 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

the difference is the following:

1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp extractor and watermark assigner.
A timestamp extractor tells Flink when an event happened, i.e., it extracts a timestamp from the event. A watermark assigner tells Flink what the current logical time is.
The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink asks what the current time is, it returns the latest observed timestamp minus the a configurable bound. This is the safety margin for late data.
 A record whose timestamp is lower than the last watermark is considered to be late.

2) The allowedLateness parameter of time windows tells Flink how long to keep state around after the window was evaluated.
If data arrives after the evaluation and before the allowedLateness has passed, the window function is applied again and an update is sent out.

Let's look at an example.
Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling window that starts at 12:00 and ends at 12:10:

If you have the following data:

12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 minutes
12:02, C
12:08, D
12:14, E
WM, 12:12
12:16, F
WM, 12:14 // 12:16 - 2 minutes
12:09, G

== no allowed lateness
The window operator forwards the logical time to 12:12 when it receives <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this time and finally purges its state. <12:09, G> is later ignored.

== allowed lateness of 3 minutes
The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. The state is purged when <WM, 12:14> is received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is again ignored.

== allowed lateness of 5 minutes
The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. When <12:09, G> is received, the window is again evaluated but this time with [A, B, C, D, G] and an update is sent out. The state is purged when a watermark of >=12:15 is received.

So, watermarks tell the Flink what time it is and allowed lateness tells the system when state should be discarded and all later arriving data be ignored.
These issue are related but not exactly the same thing. For instance you can counter late data by increasing the bound or the lateness parameter.
Increasing the watermark bound will yield higher latencies as windows are evaluated later.
Configuring allowedLateness will allow for earlier results, but you have to cope with the updates downstream.

Please let me know, if you have questions.

Best, Fabian


 

 



2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi,

I'm a bit confused about how Flink deals with late elements after the introduction of allowedlateness to windows. What is the difference between using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and allowedlateness(Time.seconds(X))? What if one is used and the other is not? and what if a different lateness is used in each one? Could you please clarify it on basis of a simple example? Thank you.

Best,
Yassine