Race between window assignment and same window timeout

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Race between window assignment and same window timeout

Shimony, Shay

Hi,

 

It seems like we encounter a race situation between the aggregation thread and the Time Trigger thread.

It might not be a bug, but it still seems strange to us, and we would like your help to fix it/work around it please.

 

First, few descriptions about our use case and system:

·         We are working with processing time.

·         We are using Flink 1.4.

·         We use our customized sliding window of size 1 minute, slide 10 seconds.
But we think it can happen also in tumbling window. So for simplicity, let’s assume tumbling window of 1 minute.

·         Our window Trigger does FIRE upon each element.

·         We have constant 2k/sec incoming messages, balanced rate.

·         When I say “window state” I mean simply our aggregation value in it.

 

If the timestamp of an element is very close to the end of the window, then it will be assigned with that window of course, but it occasionally happen that this window is timing out and cleared – before this element is aggregated with the window state, thus we lost the previous aggregation value and got new aggregation state with the element value.

 

Below is the story as seen by the threads.

Timestamps are logical.

 

Suppose we are in the beginning of WindowOperator.processElement.

Current time: 119 (nearly 120)

 

Reducer thread

Time Trigger thread

Assign element to window [60, 120],

because context.getCurrentProcessingTime()
Returned 119 (in assignWindows)

 

 

Time is 120 à clear window state

Add the element value to window state [60, 120] (it starts from new state)

 

 

Our questions:

1.       Is it a legitimate race? (We expected that (1) assigning element to a window + aggregating it to its state, and (2) clearing the window – would be atomic to each other – that is, if an element is valid for a window, then it will be assigned to it and aggregated fully into its state, and only then window clear can happen).

2.       How could we make the Time Trigger thread wait a little bit with the window cleaning? Like adding 500ms to clean window time schedule.
We thought to override WindowOperator.cleanupTime, so is it possible to easily replace WindowOperator with ours?

3.       Maybe you have different idea to work around it?

 

Thanks!

Shay

 

Reply | Threaded
Open this post in threaded view
|

Re: Race between window assignment and same window timeout

Andrey Zagrebin
Hi Shay,

I would suggest to try Allowed Lateness, like you mention 500 ms:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#allowed-lateness
It might also work for processing time.

Cheers,
Andrey

On 18 Jul 2018, at 17:22, Shimony, Shay <[hidden email]> wrote:

Hi,
 
It seems like we encounter a race situation between the aggregation thread and the Time Trigger thread.
It might not be a bug, but it still seems strange to us, and we would like your help to fix it/work around it please.
 
First, few descriptions about our use case and system:
·         We are working with processing time.
·         We are using Flink 1.4.
·         We use our customized sliding window of size 1 minute, slide 10 seconds. 
But we think it can happen also in tumbling window. So for simplicity, let’s assume tumbling window of 1 minute.
·         Our window Trigger does FIRE upon each element.
·         We have constant 2k/sec incoming messages, balanced rate.
·         When I say “window state” I mean simply our aggregation value in it.
 
If the timestamp of an element is very close to the end of the window, then it will be assigned with that window of course, but it occasionally happen that this window is timing out and cleared – before this element is aggregated with the window state, thus we lost the previous aggregation value and got new aggregation state with the element value.
 
Below is the story as seen by the threads. 
Timestamps are logical.
 
Suppose we are in the beginning of WindowOperator.processElement.
Current time: 119 (nearly 120)
 
Reducer thread
Time Trigger thread
Assign element to window [60, 120],
because context.getCurrentProcessingTime()
Returned 119 (in assignWindows)
 
 
Time is 120 à clear window state
Add the element value to window state [60, 120] (it starts from new state)
 
 
Our questions:
1.       Is it a legitimate race? (We expected that (1) assigning element to a window + aggregating it to its state, and (2) clearing the window – would be atomic to each other – that is, if an element is valid for a window, then it will be assigned to it and aggregated fully into its state, and only then window clear can happen).
2.       How could we make the Time Trigger thread wait a little bit with the window cleaning? Like adding 500ms to clean window time schedule.
We thought to override WindowOperator.cleanupTime, so is it possible to easily replace WindowOperator with ours?
3.       Maybe you have different idea to work around it?
 
Thanks!
Shay

Reply | Threaded
Open this post in threaded view
|

Re: Race between window assignment and same window timeout

Fabian Hueske-2
Hi Shay,

This sounds very much like the off-by-one bug described by FLINK-9857 [1].
The problem was identified in another recent user ml thread and fixed for Flink 1.5.2 and 1.6.0.

Best, Fabian


2018-07-18 19:00 GMT+02:00 Andrey Zagrebin <[hidden email]>:
Hi Shay,

I would suggest to try Allowed Lateness, like you mention 500 ms:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#allowed-lateness
It might also work for processing time.

Cheers,
Andrey

On 18 Jul 2018, at 17:22, Shimony, Shay <[hidden email]> wrote:

Hi,
 
It seems like we encounter a race situation between the aggregation thread and the Time Trigger thread.
It might not be a bug, but it still seems strange to us, and we would like your help to fix it/work around it please.
 
First, few descriptions about our use case and system:
·         We are working with processing time.
·         We are using Flink 1.4.
·         We use our customized sliding window of size 1 minute, slide 10 seconds. 
But we think it can happen also in tumbling window. So for simplicity, let’s assume tumbling window of 1 minute.
·         Our window Trigger does FIRE upon each element.
·         We have constant 2k/sec incoming messages, balanced rate.
·         When I say “window state” I mean simply our aggregation value in it.
 
If the timestamp of an element is very close to the end of the window, then it will be assigned with that window of course, but it occasionally happen that this window is timing out and cleared – before this element is aggregated with the window state, thus we lost the previous aggregation value and got new aggregation state with the element value.
 
Below is the story as seen by the threads. 
Timestamps are logical.
 
Suppose we are in the beginning of WindowOperator.processElement.
Current time: 119 (nearly 120)
 
Reducer thread
Time Trigger thread
Assign element to window [60, 120],
because context.getCurrentProcessingTime()
Returned 119 (in assignWindows)
 
 
Time is 120 à clear window state
Add the element value to window state [60, 120] (it starts from new state)
 
 
Our questions:
1.       Is it a legitimate race? (We expected that (1) assigning element to a window + aggregating it to its state, and (2) clearing the window – would be atomic to each other – that is, if an element is valid for a window, then it will be assigned to it and aggregated fully into its state, and only then window clear can happen).
2.       How could we make the Time Trigger thread wait a little bit with the window cleaning? Like adding 500ms to clean window time schedule.
We thought to override WindowOperator.cleanupTime, so is it possible to easily replace WindowOperator with ours?
3.       Maybe you have different idea to work around it?
 
Thanks!
Shay