Dear list-members, I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12). My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence. The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime. To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream. The sliding window is processed with a custom ProcessWindowFunction. env.setStreamTimeCharacteristic(EventTime) My AssignerWithPeriodicWaterMarks looks like this. In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time. My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime. My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause? Thanks, Eric |
I should add that the behaviour persists, even when I force parallelism to 1. On Wed, Aug 21, 2019 at 5:19 PM Eric Isling <[hidden email]> wrote:
|
In reply to this post by Eric Isling
The role of the watermarks in your job will be to trigger the closing
of the sliding event time windows. In order to play that role properly, they should be based on the timestamps in the events, rather than some arbitrary constant (9999L). The reason why the same object is responsible for extracting timestamps and supplying watermarks is so that this object can base the watermarks it creates on its observations of the timestamps in the event stream. So unless your event timestamps are also based on incrementing a similar counter, this may explain some of the behavior you are seeing. Another issue is that while extractTimestamp is called for every event, in a periodic watermark assigner the getCurrentWatermark method is called in a separate thread once every 200 msec (by default). If you want watermarks after every event you'll need to use an AssignerWithPunctuatedWatermarks, though doing so is something of an anti-pattern (because having that many watermarks adds overhead). If your timestamps are completely artificial, you might find a SlidingCountWindow a more natural fit for what you're doing. On Wed, Aug 21, 2019 at 6:20 PM Eric Isling <[hidden email]> wrote: > > Dear list-members, > > I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12). > > My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence. > The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime. > To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream. > The sliding window is processed with a custom ProcessWindowFunction. > > env.setStreamTimeCharacteristic(EventTime) > val seqStream = env.addSource(Seqstream) > .assignTimestampsAndWatermarks(SeqTimeStampExtractor()) > .keyBy(getEventtimeKey) > .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize))) > > val result = seqStream.process(ProcessSeqWindow(target1)) > > My AssignerWithPeriodicWaterMarks looks like this. > class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> { > var waterMark = 9999L > override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long { > return element.f1 > } > > override fun getCurrentWatermark(): Watermark? { > waterMark += 1 > return Watermark(waterMark) > } > } > > In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time. > Stepping through the stream in a debugger, indicates that EventTime / Watremarks are generated as would expect. > > My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime. > > My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause? > > Thanks, > > Eric |
In reply to this post by Eric Isling
If you still need help diagnosing the cause of the misbehavior, please
share more of the code with us. On Wed, Aug 21, 2019 at 6:24 PM Eric Isling <[hidden email]> wrote: > > I should add that the behaviour persists, even when I force parallelism to 1. > > On Wed, Aug 21, 2019 at 5:19 PM Eric Isling <[hidden email]> wrote: >> >> Dear list-members, >> >> I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12). >> >> My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence. >> The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime. >> To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream. >> The sliding window is processed with a custom ProcessWindowFunction. >> >> env.setStreamTimeCharacteristic(EventTime) >> val seqStream = env.addSource(Seqstream) >> .assignTimestampsAndWatermarks(SeqTimeStampExtractor()) >> .keyBy(getEventtimeKey) >> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize))) >> >> val result = seqStream.process(ProcessSeqWindow(target1)) >> >> My AssignerWithPeriodicWaterMarks looks like this. >> class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> { >> var waterMark = 9999L >> override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long { >> return element.f1 >> } >> >> override fun getCurrentWatermark(): Watermark? { >> waterMark += 1 >> return Watermark(waterMark) >> } >> } >> >> In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time. >> Stepping through the stream in a debugger, indicates that EventTime / Watremarks are generated as would expect. >> >> My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime. >> >> My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause? >> >> Thanks, >> >> Eric |
Free forum by Nabble | Edit this page |