Hi Community,
Regarding this problem, could someone give me an explanation? Thanks.
Best,
Fuyao
Hi Kevin,
Sorry for the name typo...
On 11/10/20 16:48, [hidden email] wrote:
Hi Kavin,
Thanks for your example. I think I have already done something very very similar before. I didn't post the full WatermarkStrategy interface in my previous email, but I do have that part already. I think the example you gave me is a punctuatedWatermarkStrategy, not boundoutoforderness one. My major concern now is that why my emitted watermark is not available in processElement() and why I have 8 records for each time the code reaches the onPeriodicEmit part. I will post my code following your example below.
The symptom is that I will get the context watermark as LONG.MIN_VALUE if I use the watermark strategy below.
16:35:12,969 INFO org.myorg.quickstart.processor.TableOutputProcessFunction - context current key: 69215, context current watermark: -9223372036854775808
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);retractStream .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) .keyBy( value -> { String invoice_id_key = (String) value.f1.getField(0); if (invoice_id_key == null) { invoice_id_key = (String) value.f1.getField(4); } return invoice_id_key; }) .process(new TableOutputProcessFunction()) .name("ProcessTableOutput") .uid("ProcessTableOutput") .addSink(businessObjectSink) .name("businessObjectSink") .uid("businessObjectSink") .setParallelism(1);watermark strategy:
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<Boolean, Row>> { @Override public WatermarkGenerator<Tuple2<Boolean, Row>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new PunctuatedTableOutputWatermarkGenerator(); } @Override public TimestampAssigner<Tuple2<Boolean, Row>> createTimestampAssigner(TimestampAssignerSupplier.Context context) { log.info("Inside timestamp assigner"); return (booleanRowTuple2, previousElementTimestamp) -> { return my timestamp; }; } }watermark generator code:
public class PunctuatedTableOutputWatermarkGenerator implements WatermarkGenerator<Tuple2<Boolean, Row>> { @Override public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) { watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); log.info("Emit Punctuated watermark: {}", eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { // don't need to do anything because we emit in reaction to events above } }16:35:13,584 INFO org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator - Emit Punctuated watermark: 1605054900905
From the log, I can see, it extract the eventTimestamp and emits the watermark. Why i can't access this piece of information in processElement() function.
Any suggestions? Thank you so much!
Best regards,
Fuyao
On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own WatermarkStrategy class and register that to window.assignTimestampsAndWatermarks(new YourEventWatermarkStrategy)
Make sure you use KafkaConsumer's assignTimestampsAndWatermarks if you're using Kafka consumers
an example code for a booking event that has it's internal timestamp would be
public class BookingWatermarkStrategy implements WatermarkStrategy<Booking> { @Override public WatermarkGenerator<Booking> createWatermarkGenerator( WatermarkGeneratorSupplier.Context context ) { return new WatermarkGenerator<Booking>() { private final long OUT_OF_ORDERNESS_MILLIS = 30; private long currentMaxTimestamp = Long.MIN_VALUE + OUT_OF_ORDERNESS_MILLIS + 1; @Override public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp()); Watermark watermark = new Watermark(currentMaxTimestamp - OUT_OF_ORDERNESS_MILLIS - 1); output.emitWatermark(watermark); } @Override public void onPeriodicEmit(WatermarkOutput output) { // Do nothing since watermark will be emitted every event } }; } @Override public TimestampAssigner<Booking> createTimestampAssigner( TimestampAssignerSupplier.Context context ) { return (booking, recordTimestamp) -> booking.getTimestamp(); } }
On Wed, Nov 11, 2020 at 12:28 AM <[hidden email]> wrote:
Hi Experts,
I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.
I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.
Problem:
1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.
2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
This is my code for watermark generator:
@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
private final long maxTimeLag = 15000;
private transient long currentMaxTimestamp = 15000;
@Override
public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
// the eventTimestamp is get through TimestampAssigner
//
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Policy 1: timelag strategy, can work and advance the timestamp
long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
output.emitWatermark(new Watermark(watermarkEpochTime));
// Policy 2: periodic emit based on event
long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
// output.emitWatermark(new Watermark(periodicEmitWatermarkTime));
log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
, watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
}
}
This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
Any insights? Thank you very much!
Best,
Fuyao
Free forum by Nabble | Edit this page |