Hi,
Event Time Window: 15s
My currentWatermark for Event Time processing is not increasing fast enough to go past the window maxTimestamp.
I have reduced bound used for watermark calculation to just 10 ms.
I have increased the parallelInput to process input from Kinesis in parallel to 2 slots on my laptop.//env.addSource(kinesisConsumer).setParallelism(2);
For FlinkKinesisConsumer, I added a property from flink-1.8.0,
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, 25);//this didn't seem to help
//in EventTimeTrigger.java: if (window.maxTimestamp() <= ctx.getCurrentWatermark()) Trigger.FIRE;
My event producer to Kinesis is producing at a delay of 2500 ms for each record.(business requirement).
What else can I do to consume data from Kinesis faster and cross the threshold for
currentWatermark to increase beyond the window.maxTimestamp faster ?
MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
private long maxTimestamp = Long.MIN_VALUE;
public MonitoringTSWAssigner() {
}
public MonitoringTSWAssigner(long bound) {
this.bound = bound;
}
public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
long extractedTS = getExtractedTS(monitoring);
if (extractedTS > maxTimestamp) {
maxTimestamp = extractedTS;
}
return extractedTS;
//return System.currentTimeMillis();
}
public long getExtractedTS(Map<String, Object> monitoring) {
final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
return Utils.getLongFromDateStr(eventTimestamp);
}
@Override
public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
long extractedTS = getExtractedTS(monitoring);
long nextWatermark = extractedTimestamp - bound;
return new Watermark(nextWatermark);
}
}
TIA