Trying to comprehend rolling windows + event time

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Trying to comprehend rolling windows + event time

lofifnc
Hi,

I have a setup where I'm feeding a rolling window with event time:
https://gist.github.com/lofifnc/dd946fef6f4b3eb25ef1 (Obviously i'm using Flinkspector)

The first case behaves as expected I'm emitting three records which are all in the time frame of the first window triggering:
("hans", "elephant", 15) at second 0 with watermark 0
("susi", "arctic", 20) at second 30 with watermark 30
("pete", "elephant", 40) at second 50 with watermark 50
(You can see  this as well in the console output).
The result is I'm getting:
        (susi,arctic,20)
        (hans,elephant,55)
Each 5 times as expected.

In the second case it gets interesting as i'm emitting a 4th record which does not get evaluated within the first triggering of the window:
("hans", "elephant", 15) at second 0 with watermark 0
("susi", "arctic", 20) at second 30 with watermark 30
("pete", "elephant", 40) at second 50 with watermark 50
("grace", "arctic", 25) at second 90 with watermark 90
Now the second time the of the window is evaluated looks like this:
        (grace,arctic,25)
        (susi,arctic,45)
        (hans,elephant,55)

The 4th record is emitted in a non aggregated form, but the grouping for the "arctic" key has been updated.

In the third case it gets really interesting... I'm emitting a 5th record which will fall into the third evaluation of the window.
This time the record is shown twice in the output:
        (sven,elephant,5)
        (grace,arctic,25)
        (sven,elephant,5)
        (hans,elephant,60)

I played around with this a little bit and if you insert a record into the 4th evaluation it is shown 3 times, in the 5th it will be in the output 4 times and so on. The test itself is deterministic between test runs.  

Lastly i've been able to reproduce the behavior using only vanilla flink so I'm fairly certain this is not a side effect of flinkspector.

I'm slowly getting a headache trying to wrap my head around why I'm seeing this behavior, but I can't find a satisfying explanation.

Best Alex!





Reply | Threaded
Open this post in threaded view
|

Re: Trying to comprehend rolling windows + event time

lofifnc
I should add i'm using version 0.10.1
Reply | Threaded
Open this post in threaded view
|

Re: Trying to comprehend rolling windows + event time

Aljoscha Krettek
Hi,
in the second case: do you know what watermark is emitted after ("grace", "arctic", 25) is emitted? I imagine it is Long.MAX_VALUE, since otherwise all of the windows would not have been triggered. If there are no intermittent watermarks but we directly jump to the last (Long.MAX_VALUE) watermark then all in-flight windows will be emitted. The order in which they are emitted is arbitrary and I guess the lone (“grace”, “arctic”, 25) seen first in the output actually belongs to window (2 min to 7 min).

Using a WindowFunction you could try and also print the window to which emitted elements belong.

Cheers,
Aljoscha
> On 19 Feb 2016, at 13:23, lofifnc <[hidden email]> wrote:
>
> I should add i'm using version 0.10.1
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Trying-to-comprehend-rolling-windows-event-time-tp5034p5035.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Trying to comprehend rolling windows + event time

lofifnc
Hi,

You're right, expect that ("grace", "arctic", 25) is emitted with timestamp 90 seconds along with a for watermark 90 seconds.

I followed your advice and implemented a simple window function printing the start + end of a window along with it's content. You can see that a window from minute 1 till 6 is emitted containing the ("grace", "arctic", 25) triple.

window: -4 -> 1 key: (arctic)
(susi,arctic,20)
===================
window: -4 -> 1 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: 1 -> 6 key: (arctic)
(grace,arctic,25)
===================
window: 0 -> 5 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: 0 -> 5 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -1 -> 4 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -1 -> 4 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: -2 -> 3 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -2 -> 3 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: -3 -> 2 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -3 -> 2 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================

So what basically happens is that the windowing mechanism emits all unfinished windows when it's closed?
Because based on the watermarks Flink can not decide that, the window 1 -> 6 is finished.

Best Alex.
Reply | Threaded
Open this post in threaded view
|

Re: Trying to comprehend rolling windows + event time

Aljoscha Krettek
Hi, 
yes in version 0.10.x there is a final implicit Long.MAX_VALUE watermark that flushes out at the end. We are changing this for the 1.0 release because this behaviour can be unexpected. 

On Sat, Feb 20, 2016, 17:32 lofifnc <[hidden email]> wrote:
Hi,

You're right, expect that ("grace", "arctic", 25) is emitted with timestamp
90 seconds along with a for watermark 90 seconds.

I followed your advice and implemented a simple window function printing the
start + end of a window along with it's content. You can see that a window
from minute 1 till 6 is emitted containing the ("grace", "arctic", 25)
triple.

window: -4 -> 1 key: (arctic)
(susi,arctic,20)
===================
window: -4 -> 1 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: 1 -> 6 key: (arctic)
(grace,arctic,25)
===================
window: 0 -> 5 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: 0 -> 5 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -1 -> 4 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -1 -> 4 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: -2 -> 3 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -2 -> 3 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================
window: -3 -> 2 key: (arctic)
(susi,arctic,20)
(grace,arctic,25)
===================
window: -3 -> 2 key: (elephant)
(hans,elephant,15)
(pete,elephant,40)
===================

So what basically happens is that the windowing mechanism emits all
unfinished windows when it's closed?
Because based on the watermarks Flink can not decide that, the window 1 -> 6
is finished.

Best Alex.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Trying-to-comprehend-rolling-windows-event-time-tp5034p5055.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Trying to comprehend rolling windows + event time

nsengupta
In reply to this post by lofifnc

Hello  Alex <[hidden email]>,

Many thanks for the explanation. '5 different windows' - that's the key. I missed that completely. Thanks for plugging the hole; I think I understand the behaviour better now.

I will follow your code-snippet (gist).

A lot more thanks for sharing write-up by Tyler Akidau! Those are fantastic. I was not aware of them at all. Being a Stream Processing enthusiast, I probably should have been.
 
-- Nirmalya


-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."