Hi All,
The watermark is monotonous incremental in a stream, correct? Given a stream out-of-order extremely, e.g. e4(12:04:33) --> e3 (15:00:22) --> e2(12:04:21) --> e1 (12:03:01) Here e1 appears first, so watermark start from 12:03:01, so e3 is an early event, it would be placed in another window, and fired individually, correct? If so, the result is not bad. The worse case is: e4(12:04:33) --> e3 (12:03:01) --> e2(12:04:21) --> e1 (15:00:22) Then e2,e3,e4 would be considered late events and get discarded? And the watermark are set to a wrong value permanently? So the stream must not be that out-of-order, otherwise flink could not handle them well? |
Hi, this depends on how you generate watermarks [1]. These considerations are quite use case specific, so it's hard to give an advice that applies to all cases. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness [3] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#getting-late-data-as-a-side-output 2017-12-12 10:16 GMT+01:00 Jinhua Luo <[hidden email]>: Hi All, |
Yes, I know flink is flexible.
But I am thinking when the event sequence is mess (e,g, branches of time-series events interleaved, but each branch has completely different time periods), then it's hard to apply them into streaming api, because no matter which way you generate watermark, the watermark cannot be backward or branching. Is there any best practice to handle late event and/or early event? 2017-12-12 18:24 GMT+08:00 Fabian Hueske <[hidden email]>: > Hi, > > this depends on how you generate watermarks [1]. > You could generate watermarks with a four hour delay and be fine (at the > cost of a four hour latency) or have some checks that you don't increment a > watermark by more than x minutes at a time. > These considerations are quite use case specific, so it's hard to give an > advice that applies to all cases. > > There are also different strategies for how to handle late data in windows. > You can drop it (default behavior), you can update previously emitted > results (allowed lateness) [2], or emit them to a side output [3]. > > Flink is quite flexible when dealing with watermarks and late data. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#getting-late-data-as-a-side-output > > 2017-12-12 10:16 GMT+01:00 Jinhua Luo <[hidden email]>: >> >> Hi All, >> >> The watermark is monotonous incremental in a stream, correct? >> >> Given a stream out-of-order extremely, e.g. >> e4(12:04:33) --> e3 (15:00:22) --> e2(12:04:21) --> e1 (12:03:01) >> >> Here e1 appears first, so watermark start from 12:03:01, so e3 is an >> early event, it would be placed in another window, and fired >> individually, correct? If so, the result is not bad. >> >> The worse case is: >> >> e4(12:04:33) --> e3 (12:03:01) --> e2(12:04:21) --> e1 (15:00:22) >> >> >> Then e2,e3,e4 would be considered late events and get discarded? And >> the watermark are set to a wrong value permanently? >> >> So the stream must not be that out-of-order, otherwise flink could not >> handle them well? > > |
Early events are usually not an issue because the can be kept in state until they are ready to be processed. Handling of late events depends on your use case and there are the three options that I already listed:Also, depending on the watermark assigner often push the watermark ahead such that they are not early but all other events are late. 1) dropping 2) keeping state of "completed" computations for some time (allowed lateness). If a late event arrives, you can update the result and emit an update. In this case your downstream operators systems have to be able to deal with updates. 3) send the late events to a different channel via side outputs and handle them later. 2017-12-12 12:14 GMT+01:00 Jinhua Luo <[hidden email]>: Yes, I know flink is flexible. |
Think about we have a normal ordered stream, if an abnormal event A
appears and thus advances the watermark, making all subsequent normal events (earlier than A) late, I think it's a mistake. The ways you listed cannot help this mistake. The normal events cannot be dropped, and the lateness may be hard to determine (it depends on the timestamp of the abnormal event) and re-triggered the window to downstream brings in side-effect. If the abnormal event appears in the middle of stream, then maybe we could filter out this event checking the delta with the last element, but what if the abnormal event is the first event emitted by the source? 2017-12-12 19:25 GMT+08:00 Fabian Hueske <[hidden email]>: > Early events are usually not an issue because the can be kept in state until > they are ready to be processed. > Also, depending on the watermark assigner often push the watermark ahead > such that they are not early but all other events are late. > > Handling of late events depends on your use case and there are the three > options that I already listed: > > 1) dropping > 2) keeping state of "completed" computations for some time (allowed > lateness). If a late event arrives, you can update the result and emit an > update. In this case your downstream operators systems have to be able to > deal with updates. > 3) send the late events to a different channel via side outputs and handle > them later. > > > > 2017-12-12 12:14 GMT+01:00 Jinhua Luo <[hidden email]>: >> >> Yes, I know flink is flexible. >> >> But I am thinking when the event sequence is mess (e,g, branches of >> time-series events interleaved, but each branch has completely >> different time periods), then it's hard to apply them into streaming >> api, because no matter which way you generate watermark, the watermark >> cannot be backward or branching. >> >> Is there any best practice to handle late event and/or early event? >> >> >> 2017-12-12 18:24 GMT+08:00 Fabian Hueske <[hidden email]>: >> > Hi, >> > >> > this depends on how you generate watermarks [1]. >> > You could generate watermarks with a four hour delay and be fine (at the >> > cost of a four hour latency) or have some checks that you don't >> > increment a >> > watermark by more than x minutes at a time. >> > These considerations are quite use case specific, so it's hard to give >> > an >> > advice that applies to all cases. >> > >> > There are also different strategies for how to handle late data in >> > windows. >> > You can drop it (default behavior), you can update previously emitted >> > results (allowed lateness) [2], or emit them to a side output [3]. >> > >> > Flink is quite flexible when dealing with watermarks and late data. >> > >> > Best, Fabian >> > >> > [1] >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html >> > [2] >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness >> > [3] >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#getting-late-data-as-a-side-output >> > >> > 2017-12-12 10:16 GMT+01:00 Jinhua Luo <[hidden email]>: >> >> >> >> Hi All, >> >> >> >> The watermark is monotonous incremental in a stream, correct? >> >> >> >> Given a stream out-of-order extremely, e.g. >> >> e4(12:04:33) --> e3 (15:00:22) --> e2(12:04:21) --> e1 (12:03:01) >> >> >> >> Here e1 appears first, so watermark start from 12:03:01, so e3 is an >> >> early event, it would be placed in another window, and fired >> >> individually, correct? If so, the result is not bad. >> >> >> >> The worse case is: >> >> >> >> e4(12:04:33) --> e3 (12:03:01) --> e2(12:04:21) --> e1 (15:00:22) >> >> >> >> >> >> Then e2,e3,e4 would be considered late events and get discarded? And >> >> the watermark are set to a wrong value permanently? >> >> >> >> So the stream must not be that out-of-order, otherwise flink could not >> >> handle them well? >> > >> > > > |
As I said before, you can solve that with a custom WatermarkAssigner. Collect a histogram, take the median out of X samples, ignore outliers, etc.2017-12-12 13:37 GMT+01:00 Jinhua Luo <[hidden email]>: Think about we have a normal ordered stream, if an abnormal event A |
To add to Fabian's comment, What can be done ( and that may not be the norm ) is keep a 95-99% quantile ( using an Approximate Histogram such that the execution is not heavy ) of the diff between server ( or ingestion time ) and event time and use it as a max out of order ness. We keep the last n of these quantile values each representing x elements and chose the least. What we figure is that giving a max upper bound on real distribution is more palatable than some arbitrary value. The late data becomes inconsequential as the lateness has been incorporated in the delayed watermark generation. Vishal. On Tue, Dec 12, 2017 at 7:46 AM, Fabian Hueske <[hidden email]> wrote:
|
A couple of other details worth mentioning: 1. Your choice of connector matters a lot. Some connectors provide limited ordering guarantees, especially across keys, which leads to a highly disordered stream of events (with respect to event time) from the perspective of the watermark generator. Some connectors provide mitigations; for example, the Kafka connector supports a per-partition watermark generator. 2. When using `assignTimestampsAndWatermarks`, Flink creates a separate instance of your watermark generator for each instance of the operator to which it is assigned. 3. Watermark generators may contain private fields but their state isn't checkpointed (to my knowledge). Hope this helps, Eron On Tue, Dec 12, 2017 at 7:53 AM, Vishal Santoshi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |