Re: Flink CEP not emitting timed out events properly

Posted by Biplob Biswas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13860.html

Hi Kostas,

Yes, I have a flag in my timestampextractor.

As you can see from the code below, I am checking whether
currentTime - systemTimeSinceLastModification > 10 sec..... as new events come then the watermark wouldn't be incremented. But as soon as I have a difference of more than 10 seconds, I am incrementing the watermark by 1 sec, I feel this is very small and I would try incrementing the watermark with a higher value but yeah this is what I am doing.


public class TimestampAndWatermarkGenerator implements AssignerWithPeriodicWatermarks<BAMEvent>{

  private final long maxOutOfOrderness = 10000; // 10 seconds

  private long currentMaxTimestamp;
  private long systemTimeSinceLastModification;
  private boolean firstEventFlag = false;
  private Logger log = LoggerFactory.getLogger(TimestampAndWatermarkGenerator.class);

  @Nullable
  @Override
  public Watermark getCurrentWatermark() {
    long currentTime = System.currentTimeMillis();
    if(firstEventFlag && (currentTime - systemTimeSinceLastModification > 10000)){
      systemTimeSinceLastModification = currentTime;
      currentMaxTimestamp = currentMaxTimestamp + 1000;
      //log.info("Current Max Time - {}, Last Modification Time - {}", currentMaxTimestamp, systemTimeSinceLastModification );
    }
    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  }

  @Override
  public long extractTimestamp(BAMEvent bamEvent, long l) {
    long timestamp = bamEvent.getTimestamp();
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    systemTimeSinceLastModification = System.currentTimeMillis();
    firstEventFlag = true;
    //log.info("Current Max Time - {}, Current Event Time - {}", currentMaxTimestamp, systemTimeSinceLastModification);

    return timestamp;
  }
}