I have a job with event-time session window of 30 minutes. I output late events to side output, where I have a tumbling processing time window of 30 minutes. I observe that the late events are written to storage before the "main" events. I wanted to know if it's normal before digging into the code and debugging the problem. Thanks |
I've made an experiment where I use an evictor on the main window (not the late one), only to write a debug file when the window fires (I don't actually evict events, I've made it so I can write a debug object the moment the window finishes). I can see that indeed the late data window fires before the main window, since the mentioned debug file does not exist, but late events _do_ exist in the destination. Writing this debug object in the evictor eliminates potential problems that might be due to logic in the process function, and it proves that the window of the late events indeed fires before the main window. Here's an outline of my job: val windowedStream = senv .addSource(kafkaSource) ... // some operators // like BoundedOutOfOrdereness but ignore future timestamps .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes)) ... // some more operators .keyingBy { case (meta, _) => meta.toPath } .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main" window .sideOutputLateData(lateDataTag) .process(new ProcessSession(sessionPlayback, config)) windowedStream .map(new SerializeSession(sessionPlayback)) .addSink(sink) windowedStream .getSideOutput(lateDataTag) .keyingBy { case (meta, _) => meta.toPath } .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late" window .process(new ProcessSession(sessionPlayback, config, true)) .map(new SerializeSession(sessionPlayback, late = true)) So, to repeat the question, is that normal? And if not - how can I fix this? Thanks On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <[hidden email]> wrote:
|
After creating a toy example I think that I've got the concept of lateDataOutput wrong. It seems that the lateDataSideOutput has nothing to do with windowing; when events arrive late they'll just go straight to the side output, and there can never be any window firing of the main flow for that specific key. On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <[hidden email]> wrote:
|
Hi Ori, you are right. Events are being sent down the side output for late events if the event's timestamp + the allowed lateness is smaller than the current watermark. These events are directly seen by downstream operators which consume the side output for late events. Cheers, Till On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski <[hidden email]> wrote:
|
Thanks On Wed, Oct 7, 2020 at 7:06 PM Till Rohrmann <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |