Hi,
I'm a bit confused about how Flink deals with late elements after the introduction of allowedlateness to windows. What is the difference between using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and allowedlateness(Time.seconds(X))? What if one is used and the other is not? and what if a different lateness is used in each one? Could you please clarify it on basis of a simple example? Thank you. Best, Yassine |
Hi Yassine, the difference is the following:A timestamp extractor tells Flink when an event happened, i.e., it extracts a timestamp from the event. A watermark assigner tells Flink what the current logical time is. The BoundedOutOfOrdernessTimestamp A record whose timestamp is lower than the last watermark is considered to be late. 2) The allowedLateness parameter of time windows tells Flink how long to keep state around after the window was evaluated. If data arrives after the evaluation and before the allowedLateness has passed, the window function is applied again and an update is sent out. Let's look at an example. Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling window that starts at 12:00 and ends at 12:10: If you have the following data: 12:01, A 12:04, B WM, 12:02 // 12:04 - 2 minutes 12:02, C 12:08, D 12:14, E WM, 12:12 12:16, F WM, 12:14 // 12:16 - 2 minutes 12:09, G == no allowed lateness The window operator forwards the logical time to 12:12 when it receives <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this time and finally purges its state. <12:09, G> is later ignored. == allowed lateness of 3 minutes The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. The state is purged when <WM, 12:14> is received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is again ignored. == allowed lateness of 5 minutes The window operator evaluates the window when <WM, 12:12> is received, but its
state is not purged yet. When <12:09, G> is received, the window is again evaluated but this time with [A, B, C, D, G] and an update is sent out. The state is purged when a watermark of >=12:15 is received. So, watermarks tell the Flink what time it is and allowed lateness tells the system when state should be discarded and all later arriving data be ignored. These issue are related but not exactly the same thing. For instance you can counter late data by increasing the bound or the lateness parameter. Increasing the watermark bound will yield higher latencies as windows are evaluated later. Configuring allowedLateness will allow for earlier results, but you have to cope with the updates downstream. Please let me know, if you have questions. Best, Fabian 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
|
I have to extend my answer: The behavior allowedLateness that I described applies only if the window trigger calls FIRE when the window is evaluated (this is the default behavior of most triggers).In case the trigger calls FIRE_AND_PURGE, the state of the window is purged when the function is evaluated and late events are processed alone, i.e., in my example <12:09, G> would be processed without [A, B, C, D]. 2016-10-17 16:24 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Hi Fabian, Thank you very much for the great answer and example, I appreciate it! It is all clear now. Best, Yassine 2016-10-17 16:29 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Free forum by Nabble | Edit this page |