I am a flink newbie and trying to apply windowing . My source is kafka and my
model does not contain event time info so, I am tring to use Kafka timestamps with assignTimestampsAndWatermarks() method I implemented two timestamp assigners as below. public class TimestampAssigner1 implements AssignerWithPeriodicWatermarks<String> { protected Logger logger = LoggerFactory.getLogger(getClass()); private static final long serialVersionUID = 1L; private long currentMaxTimestamp; private final long maxOutOfOrderness = 3500; // 3.5 seconds @Override public long extractTimestamp(String element, long previousElementTimestamp) { currentMaxTimestamp = Math.max(previousElementTimestamp, currentMaxTimestamp); logger.info(String.format("TimestampAssigner1 - currentMaxTimestamp : %s res : %s, element : %s ", currentMaxTimestamp, previousElementTimestamp, element)); return previousElementTimestamp; } @Override public Watermark getCurrentWatermark() { Watermark watermarkRes = new Watermark(currentMaxTimestamp - maxOutOfOrderness); //Watermark watermarkRes = new Watermark(currentMaxTimestamp ); //Watermark watermarkRes = new Watermark(System.currentTimeMillis() ); //logger.info(String.format("watermarkRes : %s , this : %s ", watermarkRes, this)); return watermarkRes; } } public class TimestampAssigner2 implements AssignerWithPeriodicWatermarks<String> { protected Logger logger = LoggerFactory.getLogger(getClass()); private static final long serialVersionUID = 1L; private long currentMaxTimestamp; private final long maxOutOfOrderness = 3500; // 3.5 seconds @Override public long extractTimestamp(String element, long previousElementTimestamp) { currentMaxTimestamp = Math.max(previousElementTimestamp, currentMaxTimestamp); logger.info(String.format("TimestampAssigner2 - currentMaxTimestamp : %s res : %s, element : %s ", currentMaxTimestamp, previousElementTimestamp, element)); return previousElementTimestamp; } @Override public Watermark getCurrentWatermark() { //Watermark watermarkRes = new Watermark(currentMaxTimestamp - maxOutOfOrderness); //Watermark watermarkRes = new Watermark(currentMaxTimestamp ); Watermark watermarkRes = new Watermark(System.currentTimeMillis() ); //logger.info(String.format("watermarkRes : %s , this : %s ", watermarkRes, this)); return watermarkRes; } } This is what I observe : The first one (TimestampAssigner1) fails to progress if there are no new elements from kafka source . I can actually verify this behavior, element is received but window does not complete in the absence of new elements. The second one (TimestampAssigner2) seems to progress well but as far as I understand , since I use system time, delayed elements will not be processed since they will not be included in windows. What should be the proper way of handling this situation? My requirement is to process all events in timely manner. Regards -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |