BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Fuyao Li-2
Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.

I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.

Problem:

1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.

2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
     private final long maxTimeLag = 15000;
     private transient long currentMaxTimestamp = 15000;
     @Override
     public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
         // the eventTimestamp is get through TimestampAssigner
         //
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
         currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
         log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
     }

     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         // Policy 1: timelag strategy, can work and advance the timestamp
         long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
         output.emitWatermark(new Watermark(watermarkEpochTime));

         // Policy 2: periodic emit based on event
         long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
         // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

         log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
                 , watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
     }
}


This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Any insights? Thank you very much!


Best,

Fuyao

Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Fuyao Li-2

Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, [hidden email] wrote:

Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, [hidden email] wrote:

Hi Kavin,

Thanks for your example. I think I have already done something very very similar before. I didn't post the full WatermarkStrategy interface in my previous email, but I do have that part already. I think the example you gave me is a punctuatedWatermarkStrategy, not boundoutoforderness one. My major concern now is that why my emitted watermark is not available in processElement() and why I have 8 records for each time the code reaches the onPeriodicEmit part. I will post my code following your example below.

The symptom is that I will get the context watermark as LONG.MIN_VALUE if I use the watermark strategy below.

16:35:12,969 INFO  org.myorg.quickstart.processor.TableOutputProcessFunction     - context current key: 69215, context current watermark: -9223372036854775808


DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
retractStream
    .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
    .keyBy(
        value -> {
          String invoice_id_key = (String) value.f1.getField(0);
          if (invoice_id_key == null) {
            invoice_id_key = (String) value.f1.getField(4);
          }
          return invoice_id_key;
        })
    .process(new TableOutputProcessFunction())
    .name("ProcessTableOutput")
    .uid("ProcessTableOutput")
    .addSink(businessObjectSink)
    .name("businessObjectSink")
    .uid("businessObjectSink")
    .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<Boolean, Row>> {
    @Override
    public WatermarkGenerator<Tuple2<Boolean, Row>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedTableOutputWatermarkGenerator();
    }

    @Override
    public TimestampAssigner<Tuple2<Boolean, Row>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        log.info("Inside timestamp assigner");
        return (booleanRowTuple2, previousElementTimestamp) -> {
            return my timestamp;
        };
    }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGenerator  implements WatermarkGenerator<Tuple2<Boolean, Row>> {
    @Override
    public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
        watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
        log.info("Emit Punctuated watermark: {}", eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        // don't need to do anything because we emit in reaction to events above
    }
}

16:35:13,584 INFO  org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator  - Emit Punctuated watermark: 1605054900905

From the log, I can see, it extract the eventTimestamp and emits the watermark. Why i can't access this piece of information in processElement() function.

Any suggestions? Thank you so much!


Best regards,

Fuyao



On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own WatermarkStrategy class and register that to window.assignTimestampsAndWatermarks(new YourEventWatermarkStrategy)

Make sure you use KafkaConsumer's assignTimestampsAndWatermarks if you're using Kafka consumers

an example code for a booking event that has it's internal timestamp would be

public class BookingWatermarkStrategy implements WatermarkStrategy<Booking> {

  @Override
  public WatermarkGenerator<Booking> createWatermarkGenerator(
      WatermarkGeneratorSupplier.Context context
  ) {
    return new WatermarkGenerator<Booking>() {
      private final long OUT_OF_ORDERNESS_MILLIS = 30;
      private long currentMaxTimestamp = Long.MIN_VALUE + OUT_OF_ORDERNESS_MILLIS + 1;

      @Override
      public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp());
        Watermark watermark = new Watermark(currentMaxTimestamp - OUT_OF_ORDERNESS_MILLIS - 1);
        output.emitWatermark(watermark);
      }

      @Override
      public void onPeriodicEmit(WatermarkOutput output) {
        // Do nothing since watermark will be emitted every event
      }
    };
  }

  @Override
  public TimestampAssigner<Booking> createTimestampAssigner(
      TimestampAssignerSupplier.Context context
  ) {
    return (booking, recordTimestamp) -> booking.getTimestamp();
  }
}

On Wed, Nov 11, 2020 at 12:28 AM <[hidden email]> wrote:
Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.

I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.

Problem:

1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.

2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
     private final long maxTimeLag = 15000;
     private transient long currentMaxTimestamp = 15000;
     @Override
     public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
         // the eventTimestamp is get through TimestampAssigner
         //
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
         currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
         log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
     }

     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         // Policy 1: timelag strategy, can work and advance the timestamp
         long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
         output.emitWatermark(new Watermark(watermarkEpochTime));

         // Policy 2: periodic emit based on event
         long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
         // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

         log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
                 , watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
     }
}


This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Any insights? Thank you very much!


Best,

Fuyao

Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Fuyao Li-2

Hi All,

Just to add a little more context to the problem. I have a full outer join operation before this stage. The source data stream for full outer join is a Kafka Source. I also added timestamp and watermarks to the FlinkKafkaConsumer. After that, it makes no difference to the result, still can not make the watermark to advance.

overall workflow:

two kafka topics -> two data streams in Flink -> join them together and convert to retract stream -> do KeyedProcessFunction and schedule event time timer and onTimer logic in it -> push to downstream sink.

I think there is no issues with my Syntax. But I still could NOT make the watermark to advance for event time using bound out of orderness strategy. (In Flink Cluster, the behavior is different, the watermark is advancing, but onTimer is still not triggered correctly. :(

I guess the reason is that I receive 8 records for each round of onPeriodicEmit(), only one of the eight is updated for BoundedOutOfOrderness Strategy. For timelag strategy (refer to the first email in the thread), they are all updated so that it will make the watermark to advance. I just don't know why I got 8 records every time even if I have parallelism as 1. (logs can be found in the first email in the thread.)

I also tried to debug inside Flink web interface based on the link: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html

The logs produced by Flink local cluster is different from directly starting my application. Why the behavior is inconsistent...? The context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, it is updated correctly in the Flink Cluster, except for the first record to be the the default value. But, I am still not getting the scheduled logic triggered correctly inside the onTimer method. My test workflow can be seen in the attachment. I have read through previous archives about the not updated watermark (stick to LONG.MIN_VALUE), it doesn't help much in my case. Thanks.


Best,

Fuyao



On 11/11/20 11:33, [hidden email] wrote:

Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, [hidden email] wrote:

Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, [hidden email] wrote:

Hi Kavin,

Thanks for your example. I think I have already done something very very similar before. I didn't post the full WatermarkStrategy interface in my previous email, but I do have that part already. I think the example you gave me is a punctuatedWatermarkStrategy, not boundoutoforderness one. My major concern now is that why my emitted watermark is not available in processElement() and why I have 8 records for each time the code reaches the onPeriodicEmit part. I will post my code following your example below.

The symptom is that I will get the context watermark as LONG.MIN_VALUE if I use the watermark strategy below.

16:35:12,969 INFO  org.myorg.quickstart.processor.TableOutputProcessFunction     - context current key: 69215, context current watermark: -9223372036854775808


DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
retractStream
    .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
    .keyBy(
        value -> {
          String invoice_id_key = (String) value.f1.getField(0);
          if (invoice_id_key == null) {
            invoice_id_key = (String) value.f1.getField(4);
          }
          return invoice_id_key;
        })
    .process(new TableOutputProcessFunction())
    .name("ProcessTableOutput")
    .uid("ProcessTableOutput")
    .addSink(businessObjectSink)
    .name("businessObjectSink")
    .uid("businessObjectSink")
    .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<Boolean, Row>> {
    @Override
    public WatermarkGenerator<Tuple2<Boolean, Row>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedTableOutputWatermarkGenerator();
    }

    @Override
    public TimestampAssigner<Tuple2<Boolean, Row>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        log.info("Inside timestamp assigner");
        return (booleanRowTuple2, previousElementTimestamp) -> {
            return my timestamp;
        };
    }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGenerator  implements WatermarkGenerator<Tuple2<Boolean, Row>> {
    @Override
    public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
        watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
        log.info("Emit Punctuated watermark: {}", eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        // don't need to do anything because we emit in reaction to events above
    }
}

16:35:13,584 INFO  org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator  - Emit Punctuated watermark: 1605054900905

From the log, I can see, it extract the eventTimestamp and emits the watermark. Why i can't access this piece of information in processElement() function.

Any suggestions? Thank you so much!


Best regards,

Fuyao



On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own WatermarkStrategy class and register that to window.assignTimestampsAndWatermarks(new YourEventWatermarkStrategy)

Make sure you use KafkaConsumer's assignTimestampsAndWatermarks if you're using Kafka consumers

an example code for a booking event that has it's internal timestamp would be

public class BookingWatermarkStrategy implements WatermarkStrategy<Booking> {

  @Override
  public WatermarkGenerator<Booking> createWatermarkGenerator(
      WatermarkGeneratorSupplier.Context context
  ) {
    return new WatermarkGenerator<Booking>() {
      private final long OUT_OF_ORDERNESS_MILLIS = 30;
      private long currentMaxTimestamp = Long.MIN_VALUE + OUT_OF_ORDERNESS_MILLIS + 1;

      @Override
      public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp());
        Watermark watermark = new Watermark(currentMaxTimestamp - OUT_OF_ORDERNESS_MILLIS - 1);
        output.emitWatermark(watermark);
      }

      @Override
      public void onPeriodicEmit(WatermarkOutput output) {
        // Do nothing since watermark will be emitted every event
      }
    };
  }

  @Override
  public TimestampAssigner<Booking> createTimestampAssigner(
      TimestampAssignerSupplier.Context context
  ) {
    return (booking, recordTimestamp) -> booking.getTimestamp();
  }
}

On Wed, Nov 11, 2020 at 12:28 AM <[hidden email]> wrote:
Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.

I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.

Problem:

1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.

2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
     private final long maxTimeLag = 15000;
     private transient long currentMaxTimestamp = 15000;
     @Override
     public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
         // the eventTimestamp is get through TimestampAssigner
         //
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
         currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
         log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
     }

     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         // Policy 1: timelag strategy, can work and advance the timestamp
         long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
         output.emitWatermark(new Watermark(watermarkEpochTime));

         // Policy 2: periodic emit based on event
         long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
         // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

         log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
                 , watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
     }
}


This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Any insights? Thank you very much!


Best,

Fuyao

Reply | Threaded
Open this post in threaded view
|

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Fuyao Li-2

The test workflow attachment is not added in the previous email, sorry for the confusion, please refer to the describe text workflow.. Thanks.


On 11/12/20 16:17, [hidden email] wrote:

Hi All,

Just to add a little more context to the problem. I have a full outer join operation before this stage. The source data stream for full outer join is a Kafka Source. I also added timestamp and watermarks to the FlinkKafkaConsumer. After that, it makes no difference to the result, still can not make the watermark to advance.

overall workflow:

two kafka topics -> two data streams in Flink -> join them together and convert to retract stream -> do KeyedProcessFunction and schedule event time timer and onTimer logic in it -> push to downstream sink.

I think there is no issues with my Syntax. But I still could NOT make the watermark to advance for event time using bound out of orderness strategy. (In Flink Cluster, the behavior is different, the watermark is advancing, but onTimer is still not triggered correctly. :(

I guess the reason is that I receive 8 records for each round of onPeriodicEmit(), only one of the eight is updated for BoundedOutOfOrderness Strategy. For timelag strategy (refer to the first email in the thread), they are all updated so that it will make the watermark to advance. I just don't know why I got 8 records every time even if I have parallelism as 1. (logs can be found in the first email in the thread.)

I also tried to debug inside Flink web interface based on the link: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html

The logs produced by Flink local cluster is different from directly starting my application. Why the behavior is inconsistent...? The context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, it is updated correctly in the Flink Cluster, except for the first record to be the the default value. But, I am still not getting the scheduled logic triggered correctly inside the onTimer method. My test workflow can be seen in the attachment. I have read through previous archives about the not updated watermark (stick to LONG.MIN_VALUE), it doesn't help much in my case. Thanks.


Best,

Fuyao



On 11/11/20 11:33, [hidden email] wrote:

Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, [hidden email] wrote:

Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, [hidden email] wrote:

Hi Kavin,

Thanks for your example. I think I have already done something very very similar before. I didn't post the full WatermarkStrategy interface in my previous email, but I do have that part already. I think the example you gave me is a punctuatedWatermarkStrategy, not boundoutoforderness one. My major concern now is that why my emitted watermark is not available in processElement() and why I have 8 records for each time the code reaches the onPeriodicEmit part. I will post my code following your example below.

The symptom is that I will get the context watermark as LONG.MIN_VALUE if I use the watermark strategy below.

16:35:12,969 INFO  org.myorg.quickstart.processor.TableOutputProcessFunction     - context current key: 69215, context current watermark: -9223372036854775808


DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
retractStream
    .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
    .keyBy(
        value -> {
          String invoice_id_key = (String) value.f1.getField(0);
          if (invoice_id_key == null) {
            invoice_id_key = (String) value.f1.getField(4);
          }
          return invoice_id_key;
        })
    .process(new TableOutputProcessFunction())
    .name("ProcessTableOutput")
    .uid("ProcessTableOutput")
    .addSink(businessObjectSink)
    .name("businessObjectSink")
    .uid("businessObjectSink")
    .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<Boolean, Row>> {
    @Override
    public WatermarkGenerator<Tuple2<Boolean, Row>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedTableOutputWatermarkGenerator();
    }

    @Override
    public TimestampAssigner<Tuple2<Boolean, Row>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        log.info("Inside timestamp assigner");
        return (booleanRowTuple2, previousElementTimestamp) -> {
            return my timestamp;
        };
    }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGenerator  implements WatermarkGenerator<Tuple2<Boolean, Row>> {
    @Override
    public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
        watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
        log.info("Emit Punctuated watermark: {}", eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        // don't need to do anything because we emit in reaction to events above
    }
}

16:35:13,584 INFO  org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator  - Emit Punctuated watermark: 1605054900905

From the log, I can see, it extract the eventTimestamp and emits the watermark. Why i can't access this piece of information in processElement() function.

Any suggestions? Thank you so much!


Best regards,

Fuyao



On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own WatermarkStrategy class and register that to window.assignTimestampsAndWatermarks(new YourEventWatermarkStrategy)

Make sure you use KafkaConsumer's assignTimestampsAndWatermarks if you're using Kafka consumers

an example code for a booking event that has it's internal timestamp would be

public class BookingWatermarkStrategy implements WatermarkStrategy<Booking> {

  @Override
  public WatermarkGenerator<Booking> createWatermarkGenerator(
      WatermarkGeneratorSupplier.Context context
  ) {
    return new WatermarkGenerator<Booking>() {
      private final long OUT_OF_ORDERNESS_MILLIS = 30;
      private long currentMaxTimestamp = Long.MIN_VALUE + OUT_OF_ORDERNESS_MILLIS + 1;

      @Override
      public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp());
        Watermark watermark = new Watermark(currentMaxTimestamp - OUT_OF_ORDERNESS_MILLIS - 1);
        output.emitWatermark(watermark);
      }

      @Override
      public void onPeriodicEmit(WatermarkOutput output) {
        // Do nothing since watermark will be emitted every event
      }
    };
  }

  @Override
  public TimestampAssigner<Booking> createTimestampAssigner(
      TimestampAssignerSupplier.Context context
  ) {
    return (booking, recordTimestamp) -> booking.getTimestamp();
  }
}

On Wed, Nov 11, 2020 at 12:28 AM <[hidden email]> wrote:
Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.

I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.

Problem:

1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.

2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
     private final long maxTimeLag = 15000;
     private transient long currentMaxTimestamp = 15000;
     @Override
     public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
         // the eventTimestamp is get through TimestampAssigner
         //
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
         currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
         log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
     }

     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         // Policy 1: timelag strategy, can work and advance the timestamp
         long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
         output.emitWatermark(new Watermark(watermarkEpochTime));

         // Policy 2: periodic emit based on event
         long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
         // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

         log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
                 , watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
     }
}


This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Any insights? Thank you very much!


Best,

Fuyao