Hi, I have created a TimestampAssigner as follows. I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound ?? public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> { Used here: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); TIA, |
Hi, 1. From doc[1], A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark). So I think it might be counterintuitive that generating a watermark, which is bigger than the timestamp of current element. At least you should minus the bound. 2. From the definition of watermark I think that watermark is not related with the length of window. The bound is related to your application. 3. In your case AssignerWithPunctuatedWatermarks might not be a good choice. Watermark is not free, you might send too many watermarks. If your source could generate some "watermark" element I think you could use the interface. You could choose AssignerWithPeriodicWatermarks. You can find the example from doc[2]. Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 上午7:41写道:
|
Hi Guowei, Thx for your reply. I am trying to understand the logic behind the Point 1 i.e current Watermark being currMaxTimestamp minus the bound. So, does this mean the Operator processing a task has a current Event time < current Watermark < currMaxTimestamp ??? Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ? Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java. // this guarantees that the watermark never goes backwards. How does it guarantee that watermark never goes backwards ? TIA, Vijay On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <[hidden email]> wrote:
|
Hi, Vijay >>>Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ? AFAIK, the operator that generates watermark is called by the frame work. When the operator is called depends on the operator itself. For example the operator that implements the AssignerWithPunctuatedWatermarks interface would be called for every element. >>>How does it guarantee that watermark never goes backwards ? Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would send it to the downstream. Best, Guowei Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 下午11:44写道:
|
sorry for missing a not. :( Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would _not_ send it to the downstream. Best, Guowei Guowei Ma <[hidden email]> 于2019年4月15日周一 上午9:44写道:
|
Free forum by Nabble | Edit this page |