Apache flink - event time based watermarks generators - optimal strategy

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Apache flink - event time based watermarks generators - optimal strategy

simpleusr
I am a flink newbie and trying to apply windowing . My source is kafka and my
model does not contain event time info so, I am tring to use Kafka
timestamps with assignTimestampsAndWatermarks() method

I implemented two timestamp assigners as below.

public class TimestampAssigner1 implements
AssignerWithPeriodicWatermarks<String> {
    protected Logger          logger = LoggerFactory.getLogger(getClass());

    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    @Override
    public long extractTimestamp(String element, long
previousElementTimestamp) {
        currentMaxTimestamp = Math.max(previousElementTimestamp,
currentMaxTimestamp);
        logger.info(String.format("TimestampAssigner1 - currentMaxTimestamp
: %s res : %s, element : %s ", currentMaxTimestamp,
previousElementTimestamp, element));
        return previousElementTimestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        Watermark watermarkRes = new Watermark(currentMaxTimestamp -
maxOutOfOrderness);
        //Watermark watermarkRes = new Watermark(currentMaxTimestamp );
        //Watermark watermarkRes = new Watermark(System.currentTimeMillis()
);
        //logger.info(String.format("watermarkRes : %s , this : %s ",
watermarkRes, this));
        return watermarkRes;
    }
}


public class TimestampAssigner2 implements
AssignerWithPeriodicWatermarks<String> {
    protected Logger          logger = LoggerFactory.getLogger(getClass());

    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    @Override
    public long extractTimestamp(String element, long
previousElementTimestamp) {
        currentMaxTimestamp = Math.max(previousElementTimestamp,
currentMaxTimestamp);
        logger.info(String.format("TimestampAssigner2 - currentMaxTimestamp
: %s res : %s, element : %s ", currentMaxTimestamp,
previousElementTimestamp, element));
        return previousElementTimestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        //Watermark watermarkRes = new Watermark(currentMaxTimestamp -
maxOutOfOrderness);
        //Watermark watermarkRes = new Watermark(currentMaxTimestamp );
        Watermark watermarkRes = new Watermark(System.currentTimeMillis() );
        //logger.info(String.format("watermarkRes : %s , this : %s ",
watermarkRes, this));
        return watermarkRes;
    }
}

This is what I observe : The first one (TimestampAssigner1) fails to
progress if there are no new elements from kafka source . I can actually
verify this behavior, element is received but window does not complete in
the absence of new elements. The second one (TimestampAssigner2) seems to
progress well but as far as I understand , since I use system time, delayed
elements will not be processed since they will not be included in windows.

What should be the proper way of handling this situation? My requirement is
to process all events in timely manner.

Regards



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/