public class MyTimestampExtractor implements AssignerWithPeriodicWatermarks { private static final long serialVersionUID = 1L; /** The current maximum timestamp seen so far. */ private long currentMaxTimestamp; /** The timestamp of the last emitted watermark. */ private long lastEmittedWatermark = Long.MIN_VALUE; /** * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. */ private final long maxOutOfOrderness; private long lastUpdateTimestamp = Long.MAX_VALUE; private final long processingTimeWait = Time.minutes(3).toMilliseconds(); private boolean isStreamPaused = false; public MyTimestampExtractor(Time maxOutOfOrderness) { if(maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException("Tried to set the maximum allowed " + "lateness to "+ maxOutOfOrderness +". This parameter cannot be negative."); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } public long getMaxOutOfOrdernessInMillis() { return maxOutOfOrderness; } @Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; long lastUpdatePlusDelay = lastUpdateTimestamp + processingTimeWait; // make sure we don't overflow if (lastUpdatePlusDelay < lastUpdateTimestamp) { lastUpdatePlusDelay = Long.MAX_VALUE; if(isStreamPaused) { lastEmittedWatermark = Long.MIN_VALUE; isStreamPaused = false; } } if (potentialWM > lastEmittedWatermark) { lastEmittedWatermark = potentialWM; lastUpdateTimestamp = System.currentTimeMillis(); } else if (System.currentTimeMillis() > lastUpdatePlusDelay) { // if we didn't update the watermark for a while, emit the timestamp higher than the currentMaxTimestamp // to trigger the downstream windows // NOTE: this does not gurantee that all buffered elements are evicted // so set isStreamPaused to true long start = currentMaxTimestamp - (currentMaxTimestamp % 1000); long end = start + 1000; lastEmittedWatermark = end + 1; isStreamPaused = true; // reset to MAX so that we only do this once until we emit regular // watermarks again lastUpdateTimestamp = Long.MAX_VALUE; } return new Watermark(lastEmittedWatermark); } @Override public final long extractTimestamp(MyDTO element, long previousElementTimestamp) { long timestamp = element.getRequestTime(); if(timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } }