Hi Experts,
I am trying to use to implement a KeyedProcessFunction with onTimer() callback. I need to use event time and I meet some problems with making the watermark available to my operator. I meet some strange behaviors. I have a joined retracted stream without watermark or timestamp information and i need to assign timestamps and watermarks to it. The timestamp is just a field in the stream. For the watermark generator part. Problem: 1. I can use timelag watermark generator and make it work. But for BoundedOutofOrdernessGenator, The context.timerService().currentWatermark() in ProcessElement() always sticks to the initial setup and never updates. 2. I set the autoWatermark interval to 5 seconds for debug purpose, I only attach this watermark generator in one place with parallelism 1. However, I am getting 8 records at a time. timelag policy will advance all 8 records, outOfOrderness policy will only advance 1 records. Maybe the mismatch is causing the processElement() to capture the wrong default watermark? https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator This is my code for watermark generator: @Slf4j public class PeriodicTableOutputWatermarkGenerator implements WatermarkGenerator<Tuple2<Boolean, Row>> { private final long maxTimeLag = 15000; private transient long currentMaxTimestamp = 15000; @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput output) { // the eventTimestamp is get through TimestampAssigner // https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp); log.info("eventTimestamp in onEvent method: {}", eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // Policy 1: timelag strategy, can work and advance the timestamp long watermarkEpochTime = Math.max(System.currentTimeMillis() - maxTimeLag, currentMaxTimestamp); output.emitWatermark(new Watermark(watermarkEpochTime)); // Policy 2: periodic emit based on event long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag; // output.emitWatermark(new Watermark(periodicEmitWatermarkTime)); log.info("Emit Watermark: watermark based on system time: {}, periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}" , watermarkEpochTime, periodicEmitWatermarkTime, currentMaxTimestamp); } } This is my log printed by the slf4j log above. Every time, it will give me 8 records, why it is 8 records? I think it should be 1 in theory. I am very confused. Also, the policy 1 is advancing all 8 records. Policy 2 is advancing 1 of the 8 records and not reflected in processElement(). 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266199, periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266199, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 Any insights? Thank you very much! Best, Fuyao |
Hi Community,
Regarding this problem, could someone give me an explanation? Thanks. Best, Fuyao On 11/10/20 16:56, [hidden email]
wrote:
|
Hi All, Just to add a little more context to the problem. I have a full
outer join operation before this stage. The source data stream for
full outer join is a Kafka Source. I also added timestamp and
watermarks to the FlinkKafkaConsumer. After that, it makes no
difference to the result, still can not make the watermark to
advance. overall workflow: two kafka topics -> two data streams in Flink -> join them
together and convert to retract stream -> do
KeyedProcessFunction and schedule event time timer and onTimer
logic in it -> push to downstream sink. I think there is no issues with my Syntax. But I still could NOT
make the watermark to advance for event time using bound out of
orderness strategy. (In Flink Cluster, the behavior is different,
the watermark is advancing, but onTimer is still not triggered
correctly. :( I guess the reason is that I receive 8 records for each round of
onPeriodicEmit(), only one of the eight is updated for
BoundedOutOfOrderness Strategy. For timelag strategy (refer to the
first email in the thread), they are all updated so that it will
make the watermark to advance. I just don't know why I got 8
records every time even if I have parallelism as 1. (logs can be
found in the first email in the thread.) I also tried to debug inside Flink web interface based on the link: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html The logs produced by Flink local cluster is different from directly starting my application. Why the behavior is inconsistent...? The context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, it is updated correctly in the Flink Cluster, except for the first record to be the the default value. But, I am still not getting the scheduled logic triggered correctly inside the onTimer method. My test workflow can be seen in the attachment. I have read through previous archives about the not updated watermark (stick to LONG.MIN_VALUE), it doesn't help much in my case. Thanks.
Best, Fuyao
On 11/11/20 11:33, [hidden email]
wrote:
|
The test workflow attachment is not added in the previous email,
sorry for the confusion, please refer to the describe text
workflow.. Thanks.
On 11/12/20 16:17, [hidden email]
wrote:
|
Free forum by Nabble | Edit this page |