Hi,
I have a setup where I'm feeding a rolling window with event time: https://gist.github.com/lofifnc/dd946fef6f4b3eb25ef1 (Obviously i'm using Flinkspector) The first case behaves as expected I'm emitting three records which are all in the time frame of the first window triggering: ("hans", "elephant", 15) at second 0 with watermark 0 ("susi", "arctic", 20) at second 30 with watermark 30 ("pete", "elephant", 40) at second 50 with watermark 50 (You can see this as well in the console output). The result is I'm getting: (susi,arctic,20) (hans,elephant,55) Each 5 times as expected. In the second case it gets interesting as i'm emitting a 4th record which does not get evaluated within the first triggering of the window: ("hans", "elephant", 15) at second 0 with watermark 0 ("susi", "arctic", 20) at second 30 with watermark 30 ("pete", "elephant", 40) at second 50 with watermark 50 ("grace", "arctic", 25) at second 90 with watermark 90 Now the second time the of the window is evaluated looks like this: (grace,arctic,25) (susi,arctic,45) (hans,elephant,55) The 4th record is emitted in a non aggregated form, but the grouping for the "arctic" key has been updated. In the third case it gets really interesting... I'm emitting a 5th record which will fall into the third evaluation of the window. This time the record is shown twice in the output: (sven,elephant,5) (grace,arctic,25) (sven,elephant,5) (hans,elephant,60) I played around with this a little bit and if you insert a record into the 4th evaluation it is shown 3 times, in the 5th it will be in the output 4 times and so on. The test itself is deterministic between test runs. Lastly i've been able to reproduce the behavior using only vanilla flink so I'm fairly certain this is not a side effect of flinkspector. I'm slowly getting a headache trying to wrap my head around why I'm seeing this behavior, but I can't find a satisfying explanation. Best Alex! |
I should add i'm using version 0.10.1
|
Hi,
in the second case: do you know what watermark is emitted after ("grace", "arctic", 25) is emitted? I imagine it is Long.MAX_VALUE, since otherwise all of the windows would not have been triggered. If there are no intermittent watermarks but we directly jump to the last (Long.MAX_VALUE) watermark then all in-flight windows will be emitted. The order in which they are emitted is arbitrary and I guess the lone (“grace”, “arctic”, 25) seen first in the output actually belongs to window (2 min to 7 min). Using a WindowFunction you could try and also print the window to which emitted elements belong. Cheers, Aljoscha > On 19 Feb 2016, at 13:23, lofifnc <[hidden email]> wrote: > > I should add i'm using version 0.10.1 > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Trying-to-comprehend-rolling-windows-event-time-tp5034p5035.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi,
You're right, expect that ("grace", "arctic", 25) is emitted with timestamp 90 seconds along with a for watermark 90 seconds. I followed your advice and implemented a simple window function printing the start + end of a window along with it's content. You can see that a window from minute 1 till 6 is emitted containing the ("grace", "arctic", 25) triple. window: -4 -> 1 key: (arctic) (susi,arctic,20) =================== window: -4 -> 1 key: (elephant) (hans,elephant,15) (pete,elephant,40) =================== window: 1 -> 6 key: (arctic) (grace,arctic,25) =================== window: 0 -> 5 key: (elephant) (hans,elephant,15) (pete,elephant,40) =================== window: 0 -> 5 key: (arctic) (susi,arctic,20) (grace,arctic,25) =================== window: -1 -> 4 key: (arctic) (susi,arctic,20) (grace,arctic,25) =================== window: -1 -> 4 key: (elephant) (hans,elephant,15) (pete,elephant,40) =================== window: -2 -> 3 key: (arctic) (susi,arctic,20) (grace,arctic,25) =================== window: -2 -> 3 key: (elephant) (hans,elephant,15) (pete,elephant,40) =================== window: -3 -> 2 key: (arctic) (susi,arctic,20) (grace,arctic,25) =================== window: -3 -> 2 key: (elephant) (hans,elephant,15) (pete,elephant,40) =================== So what basically happens is that the windowing mechanism emits all unfinished windows when it's closed? Because based on the watermarks Flink can not decide that, the window 1 -> 6 is finished. Best Alex. |
Hi,
yes in version 0.10.x there is a final implicit Long.MAX_VALUE watermark that flushes out at the end. We are changing this for the 1.0 release because this behaviour can be unexpected.
On Sat, Feb 20, 2016, 17:32 lofifnc <[hidden email]> wrote: Hi, |
In reply to this post by lofifnc
Hello Alex <[hidden email]>, Many thanks for the explanation. '5 different windows' - that's the key. I missed that completely. Thanks for plugging the hole; I think I understand the behaviour better now. I will follow your code-snippet (gist). A lot more thanks for sharing write-up by Tyler Akidau! Those are fantastic. I was not aware of them at all. Being a Stream Processing enthusiast, I probably should have been. -- Nirmalya -- Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Free forum by Nabble | Edit this page |