Re: Flink CEP not emitting timed out events properly
Posted by Biplob Biswas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13860.html
Hi Kostas,
Yes, I have a flag in my timestampextractor.
As you can see from the code below, I am checking whether
currentTime - systemTimeSinceLastModification > 10 sec..... as new events come then the watermark wouldn't be incremented. But as soon as I have a difference of more than 10 seconds, I am incrementing the watermark by 1 sec, I feel this is very small and I would try incrementing the watermark with a higher value but yeah this is what I am doing.
public class TimestampAndWatermarkGenerator implements AssignerWithPeriodicWatermarks<BAMEvent>{
private final long maxOutOfOrderness = 10000; // 10 seconds
private long currentMaxTimestamp;
private long systemTimeSinceLastModification;
private boolean firstEventFlag = false;
private Logger log = LoggerFactory.getLogger(TimestampAndWatermarkGenerator.class);
@Nullable
@Override
public Watermark getCurrentWatermark() {
long currentTime = System.currentTimeMillis();
if(firstEventFlag && (currentTime - systemTimeSinceLastModification > 10000)){
systemTimeSinceLastModification = currentTime;
currentMaxTimestamp = currentMaxTimestamp + 1000;
//log.info("Current Max Time - {}, Last Modification Time - {}", currentMaxTimestamp, systemTimeSinceLastModification );
}
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(BAMEvent bamEvent, long l) {
long timestamp = bamEvent.getTimestamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
systemTimeSinceLastModification = System.currentTimeMillis();
firstEventFlag = true;
//log.info("Current Max Time - {}, Current Event Time - {}", currentMaxTimestamp, systemTimeSinceLastModification);
return timestamp;
}
}