Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

Posted by Fuyao Li-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/BoundedOutOfOrderness-Watermark-Generator-is-NOT-making-the-event-time-to-advance-tp39346p39388.html

Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, [hidden email] wrote:

Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, [hidden email] wrote:

Hi Kavin,

Thanks for your example. I think I have already done something very very similar before. I didn't post the full WatermarkStrategy interface in my previous email, but I do have that part already. I think the example you gave me is a punctuatedWatermarkStrategy, not boundoutoforderness one. My major concern now is that why my emitted watermark is not available in processElement() and why I have 8 records for each time the code reaches the onPeriodicEmit part. I will post my code following your example below.

The symptom is that I will get the context watermark as LONG.MIN_VALUE if I use the watermark strategy below.

16:35:12,969 INFO  org.myorg.quickstart.processor.TableOutputProcessFunction     - context current key: 69215, context current watermark: -9223372036854775808


DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
retractStream
    .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
    .keyBy(
        value -> {
          String invoice_id_key = (String) value.f1.getField(0);
          if (invoice_id_key == null) {
            invoice_id_key = (String) value.f1.getField(4);
          }
          return invoice_id_key;
        })
    .process(new TableOutputProcessFunction())
    .name("ProcessTableOutput")
    .uid("ProcessTableOutput")
    .addSink(businessObjectSink)
    .name("businessObjectSink")
    .uid("businessObjectSink")
    .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<Boolean, Row>> {
    @Override
    public WatermarkGenerator<Tuple2<Boolean, Row>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new PunctuatedTableOutputWatermarkGenerator();
    }

    @Override
    public TimestampAssigner<Tuple2<Boolean, Row>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        log.info("Inside timestamp assigner");
        return (booleanRowTuple2, previousElementTimestamp) -> {
            return my timestamp;
        };
    }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGenerator  implements WatermarkGenerator<Tuple2<Boolean, Row>> {
    @Override
    public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput watermarkOutput) {
        watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
        log.info("Emit Punctuated watermark: {}", eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        // don't need to do anything because we emit in reaction to events above
    }
}

16:35:13,584 INFO  org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator  - Emit Punctuated watermark: 1605054900905

From the log, I can see, it extract the eventTimestamp and emits the watermark. Why i can't access this piece of information in processElement() function.

Any suggestions? Thank you so much!


Best regards,

Fuyao



On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own WatermarkStrategy class and register that to window.assignTimestampsAndWatermarks(new YourEventWatermarkStrategy)

Make sure you use KafkaConsumer's assignTimestampsAndWatermarks if you're using Kafka consumers

an example code for a booking event that has it's internal timestamp would be

public class BookingWatermarkStrategy implements WatermarkStrategy<Booking> {

  @Override
  public WatermarkGenerator<Booking> createWatermarkGenerator(
      WatermarkGeneratorSupplier.Context context
  ) {
    return new WatermarkGenerator<Booking>() {
      private final long OUT_OF_ORDERNESS_MILLIS = 30;
      private long currentMaxTimestamp = Long.MIN_VALUE + OUT_OF_ORDERNESS_MILLIS + 1;

      @Override
      public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, bookingEvent.getTimestamp());
        Watermark watermark = new Watermark(currentMaxTimestamp - OUT_OF_ORDERNESS_MILLIS - 1);
        output.emitWatermark(watermark);
      }

      @Override
      public void onPeriodicEmit(WatermarkOutput output) {
        // Do nothing since watermark will be emitted every event
      }
    };
  }

  @Override
  public TimestampAssigner<Booking> createTimestampAssigner(
      TimestampAssignerSupplier.Context context
  ) {
    return (booking, recordTimestamp) -> booking.getTimestamp();
  }
}

On Wed, Nov 11, 2020 at 12:28 AM <[hidden email]> wrote:
Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.

I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.

Problem:

1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.

2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
     private final long maxTimeLag = 15000;
     private transient long currentMaxTimestamp = 15000;
     @Override
     public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
         // the eventTimestamp is get through TimestampAssigner
         //
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
         currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
         log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
     }

     @Override
     public void onPeriodicEmit(WatermarkOutput output) {
         // Policy 1: timelag strategy, can work and advance the timestamp
         long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
         output.emitWatermark(new Watermark(watermarkEpochTime));

         // Policy 2: periodic emit based on event
         long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
         // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

         log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
                 , watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
     }
}


This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Any insights? Thank you very much!


Best,

Fuyao