Hi, When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark. Sample code below: 1.Assigner as anonymous class which works fine AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() { @Override 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) { @Override Do you see any difference in the approaches? - Jayant |
Hi Jayant, The difference is that the Watermarks from BoundedOutOfOrdernessTimestamp all previous events. That is, if you do not receive new events, the Watermark will not advance. In contrast, your custom implementation of AssignerWithPeriodicWatermarks always advances the Watermark based on the wall clock. Maybe this will already help you to debug your application. If not, it would be great to see a minimal working example. Best, Gary On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <[hidden email]> wrote:
|
Thanks Gary, I was only trying with a fixed set of events, so the Watermark was not advancing, like you said. Jayant Ameta On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <[hidden email]> wrote:
|
Another thing to point out is that watermarks are usually data-driven, i.e., they depend on the timestamps of the events and not on the clock of the machine. Otherwise, you might observe a lot of late data, i.e., events with timestamps smaller than the last watermark.2018-01-11 11:49 GMT+01:00 Jayant Ameta <[hidden email]>:
|
Hi Fabian, I want to extract timestamps from my event. However, the events stream can be sparse at times (e.g. 2 days without any events). What's the best strategy to create watermarks if I want real-time processing of the events which enter the stream? Jayant Ameta On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <[hidden email]> wrote:
|
This depends on the requirements of your application. Using the usual watermark generation strategies which are purely data driven, a stream that does not produce data would not advance its watermarks.This might also be fine if your program consumes a single stream because if this stream does not produce data, your program also doesn't have anything to compute (there might be still data left. such as a window, that is not computed). The situation becomes more tricky, if your program has multiple sources that become inactive at some point or a source where a partition can become inactive. AFAIK, there is a mechanism to mark partitions (and maybe complete sources) as inactive. @Gordon (in CC) knows more about this feature. Best, Fabian 2018-01-15 14:51 GMT+01:00 Jayant Ameta <[hidden email]>:
|
A while back I wrote this slightly more elaborate extractor that will advance the watermark independently after the stream is idle for a while: https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
Best, Aljoscha
|
Free forum by Nabble | Edit this page |