currentWatermark for Event Time is not increasing fast enough to go past the window.maxTimestamp

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

currentWatermark for Event Time is not increasing fast enough to go past the window.maxTimestamp

Vijay Balakrishnan
Hi,
Event Time Window: 15s
My currentWatermark for Event Time processing is not increasing fast enough to go past the window maxTimestamp.
I have reduced bound used for watermark calculation to just 10 ms.
I have increased the parallelInput to process input from Kinesis in parallel to 2 slots on my laptop.//env.addSource(kinesisConsumer).setParallelism(2);
For FlinkKinesisConsumer, I added a property from flink-1.8.0,
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, 25);//this didn't seem to help

//in EventTimeTrigger.java: if (window.maxTimestamp() <= ctx.getCurrentWatermark()) Trigger.FIRE;
My event producer to Kinesis is producing at a delay of 2500 ms for each record.(business requirement).
What else can I do to consume data from Kinesis faster and cross the threshold for
currentWatermark to increase beyond the window.maxTimestamp faster ?

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;
//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = extractedTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

TIA