I've made an experiment where I use an evictor on the main window (not the late one), only to write a debug file when the window fires (I don't actually evict events, I've made it so I can write a debug object the moment the window finishes).
I can see that indeed the late data window fires before the main window, since the mentioned debug file does not exist, but late events _do_ exist in the destination.
Writing this debug object in the evictor eliminates potential problems that might be due to logic in the process function, and it proves that the window of the late events indeed fires before the main window.
Here's an outline of my job:
val windowedStream = senv
.addSource(kafkaSource)
... // some operators
// like BoundedOutOfOrdereness but ignore future timestamps
.assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
... // some more operators
.keyingBy { case (meta, _) => meta.toPath }
.window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main" window
.sideOutputLateData(lateDataTag)
.process(new ProcessSession(sessionPlayback, config))
windowedStream
.map(new SerializeSession(sessionPlayback))
.addSink(sink)
windowedStream
.getSideOutput(lateDataTag)
.keyingBy { case (meta, _) => meta.toPath }
.window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late" window
.process(new ProcessSession(sessionPlayback, config, true))
.map(new SerializeSession(sessionPlayback, late = true))
So, to repeat the question, is that normal? And if not - how can I fix this?
Thanks