The test workflow attachment is not added in the previous email,
sorry for the confusion, please refer to the describe text
workflow.. Thanks.
Hi All,
Just to add a little more context to the problem. I have a full outer join operation before this stage. The source data stream for full outer join is a Kafka Source. I also added timestamp and watermarks to the FlinkKafkaConsumer. After that, it makes no difference to the result, still can not make the watermark to advance.
overall workflow:
two kafka topics -> two data streams in Flink -> join them together and convert to retract stream -> do KeyedProcessFunction and schedule event time timer and onTimer logic in it -> push to downstream sink.
I think there is no issues with my Syntax. But I still could NOT make the watermark to advance for event time using bound out of orderness strategy. (In Flink Cluster, the behavior is different, the watermark is advancing, but onTimer is still not triggered correctly. :(
I guess the reason is that I receive 8 records for each round of onPeriodicEmit(), only one of the eight is updated for BoundedOutOfOrderness Strategy. For timelag strategy (refer to the first email in the thread), they are all updated so that it will make the watermark to advance. I just don't know why I got 8 records every time even if I have parallelism as 1. (logs can be found in the first email in the thread.)
I also tried to debug inside Flink web interface based on the link: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html
The logs produced by Flink local cluster is different from directly starting my application. Why the behavior is inconsistent...? The context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, it is updated correctly in the Flink Cluster, except for the first record to be the the default value. But, I am still not getting the scheduled logic triggered correctly inside the onTimer method. My test workflow can be seen in the attachment. I have read through previous archives about the not updated watermark (stick to LONG.MIN_VALUE), it doesn't help much in my case. Thanks.
Best,
Fuyao
On 11/11/20 11:33, [hidden email] wrote:
Hi Community,
Regarding this problem, could someone give me an explanation? Thanks.
Best,
Fuyao
On 11/10/20 16:56, [hidden email] wrote:
Hi Kevin,
Sorry for the name typo...
On 11/10/20 16:48, [hidden email] wrote:
Hi Kavin,
Thanks for your example. I think I have already done something very very similar before. I didn't post the full WatermarkStrategy interface in my previous email, but I do have that part already. I think the example you gave me is a punctuatedWatermarkStrategy, not boundoutoforderness one. My major concern now is that why my emitted watermark is not available in processElement() and why I have 8 records for each time the code reaches the onPeriodicEmit part. I will post my code following your example below.
The symptom is that I will get the context watermark as LONG.MIN_VALUE if I use the watermark strategy below.
16:35:12,969 INFO org.myorg.quickstart.processor.TableOutputProcessFunction - context current key: 69215, context current watermark: -9223372036854775808
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);retractStream .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) .keyBy( value -> { String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key == null) { invoice_id_key = (String) value.f1.getField(4); } return invoice_id_key; }) .process(new TableOutputProcessFunction()) .name("ProcessTableOutput") .uid("ProcessTableOutput") .addSink(businessObjectSink) .name("businessObjectSink") .uid("businessObjectSink") .setParallelism(1);watermark strategy:
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<Boolean, Row>> { @Override public WatermarkGenerator<Tuple2<Boolean, Row>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new PunctuatedTableOutputWatermarkGenerator(); } @Override public TimestampAssigner<Tuple2<Boolean, Row>> createTimestampAssigner(TimestampAssignerSupplier.Context context) { log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> { return my timestamp; }; } }watermark generator code:
public class PunctuatedTableOutputWatermarkGenerator implements WatermarkGenerator<Tuple2<Boolean, Row>> { @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) { watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { // don't need to do anything because we emit in reaction to events above } }16:35:13,584 INFO org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator - Emit Punctuated watermark: 1605054900905
From the log, I can see, it extract the eventTimestamp and emits the watermark. Why i can't access this piece of information in processElement() function.
Any suggestions? Thank you so much!
Best regards,
Fuyao
On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own WatermarkStrategy class and register that to window.assignTimestampsAndWatermarks(new YourEventWatermarkStrategy)
Make sure you use KafkaConsumer's assignTimestampsAndWatermarks if you're using Kafka consumers
an example code for a booking event that has it's internal timestamp would be
public class BookingWatermarkStrategy implements WatermarkStrategy<Booking> { @Override public WatermarkGenerator<Booking> createWatermarkGenerator( WatermarkGeneratorSupplier.Context context ) { return new WatermarkGenerator<Booking>() { private final long OUT_OF_ORDERNESS_MILLIS = 30; private long currentMaxTimestamp = Long.MIN_VALUE + OUT_OF_ORDERNESS_MILLIS + 1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark = new Watermark(currentMaxTimestamp - OUT_OF_ORDERNESS_MILLIS - 1); output.emitWatermark(watermark); } @Override public void onPeriodicEmit(WatermarkOutput output) { // Do nothing since watermark will be emitted every event } }; } @Override public TimestampAssigner<Booking> createTimestampAssigner( TimestampAssignerSupplier.Context context ) { return (booking, recordTimestamp) -> booking.getTimestamp(); } }
On Wed, Nov 11, 2020 at 12:28 AM <[hidden email]> wrote:
Hi Experts,
I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.
I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.
Problem:
1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.
2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
This is my code for watermark generator:
@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
private final long maxTimeLag = 15000;
private transient long currentMaxTimestamp = 15000;
@Override
public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
// the eventTimestamp is get through TimestampAssigner
//
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Policy 1: timelag strategy, can work and advance the timestamp
long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
output.emitWatermark(new Watermark(watermarkEpochTime));
// Policy 2: periodic emit based on event
long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
// output.emitWatermark(new Watermark(periodicEmitWatermarkTime));
log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
, watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
}
}
This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
Any insights? Thank you very much!
Best,
Fuyao
Free forum by Nabble | Edit this page |