Multiple trigger events on keyed window

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple trigger events on keyed window

Eric Isling
Dear list-members,

I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12).

My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence.
The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime.
To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream.
The sliding window is processed with a custom ProcessWindowFunction.
env.setStreamTimeCharacteristic(EventTime)
val
seqStream = env.addSource(Seqstream)
.assignTimestampsAndWatermarks(SeqTimeStampExtractor())
.keyBy(getEventtimeKey)
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize)))

val result = seqStream.process(ProcessSeqWindow(target1))

My AssignerWithPeriodicWaterMarks looks like this.
class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
var waterMark = 9999L
override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long {
return element.f1
}

override fun getCurrentWatermark(): Watermark? {
waterMark += 1
return Watermark(waterMark)
}
}
In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time.
Stepping through the stream in a debugger, indicates that EventTime / Watremarks are generated as would expect.
My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime.
My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause?

Thanks,
Eric
Reply | Threaded
Open this post in threaded view
|

Re: Multiple trigger events on keyed window

Eric Isling
I should add that the behaviour persists, even when I force parallelism to 1.

On Wed, Aug 21, 2019 at 5:19 PM Eric Isling <[hidden email]> wrote:
Dear list-members,

I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12).

My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence.
The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime.
To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream.
The sliding window is processed with a custom ProcessWindowFunction.
env.setStreamTimeCharacteristic(EventTime)
val
seqStream = env.addSource(Seqstream)
.assignTimestampsAndWatermarks(SeqTimeStampExtractor())
.keyBy(getEventtimeKey)
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize)))

val result = seqStream.process(ProcessSeqWindow(target1))

My AssignerWithPeriodicWaterMarks looks like this.
class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
var waterMark = 9999L
override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long {
return element.f1
}

override fun getCurrentWatermark(): Watermark? {
waterMark += 1
return Watermark(waterMark)
}
}
In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time.
Stepping through the stream in a debugger, indicates that EventTime / Watremarks are generated as would expect.
My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime.
My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause?

Thanks,
Eric
Reply | Threaded
Open this post in threaded view
|

Re: Multiple trigger events on keyed window

David Anderson-2
In reply to this post by Eric Isling
The role of the watermarks in your job will be to trigger the closing
of the sliding event time windows. In order to play that role
properly, they should be based on the timestamps in the events, rather
than some arbitrary constant (9999L). The reason why the same object
is responsible for extracting timestamps and supplying watermarks is
so that this object can base the watermarks it creates on its
observations of the timestamps in the event stream. So unless your
event timestamps are also based on incrementing a similar counter,
this may explain some of the behavior you are seeing.

Another issue is that while extractTimestamp is called for every
event, in a periodic watermark assigner the getCurrentWatermark method
is called in a separate thread once every 200 msec (by default). If
you want watermarks after every event you'll need to use an
AssignerWithPunctuatedWatermarks, though doing so is something of an
anti-pattern (because having that many watermarks adds overhead).

If your timestamps are completely artificial, you might find a
SlidingCountWindow a more natural fit for what you're doing.

On Wed, Aug 21, 2019 at 6:20 PM Eric Isling <[hidden email]> wrote:

>
> Dear list-members,
>
> I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12).
>
> My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence.
> The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime.
> To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream.
> The sliding window is processed with a custom ProcessWindowFunction.
>
> env.setStreamTimeCharacteristic(EventTime)
> val seqStream = env.addSource(Seqstream)
>     .assignTimestampsAndWatermarks(SeqTimeStampExtractor())
>     .keyBy(getEventtimeKey)
>     .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize)))
>
> val result = seqStream.process(ProcessSeqWindow(target1))
>
> My AssignerWithPeriodicWaterMarks looks like this.
> class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
>     var waterMark  = 9999L
>     override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long {
>         return element.f1
>     }
>
>     override fun getCurrentWatermark(): Watermark? {
>         waterMark += 1
>         return Watermark(waterMark)
>     }
> }
>
> In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time.
> Stepping through the stream in a debugger, indicates that EventTime / Watremarks are generated as would expect.
>
> My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime.
>
> My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause?
>
> Thanks,
>
> Eric
Reply | Threaded
Open this post in threaded view
|

Re: Multiple trigger events on keyed window

David Anderson-2
In reply to this post by Eric Isling
If you still need help diagnosing the cause of the misbehavior, please
share more of the code with us.

On Wed, Aug 21, 2019 at 6:24 PM Eric Isling <[hidden email]> wrote:

>
> I should add that the behaviour persists, even when I force parallelism to 1.
>
> On Wed, Aug 21, 2019 at 5:19 PM Eric Isling <[hidden email]> wrote:
>>
>> Dear list-members,
>>
>> I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12).
>>
>> My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence.
>> The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime.
>> To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream.
>> The sliding window is processed with a custom ProcessWindowFunction.
>>
>> env.setStreamTimeCharacteristic(EventTime)
>> val seqStream = env.addSource(Seqstream)
>>     .assignTimestampsAndWatermarks(SeqTimeStampExtractor())
>>     .keyBy(getEventtimeKey)
>>     .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize)))
>>
>> val result = seqStream.process(ProcessSeqWindow(target1))
>>
>> My AssignerWithPeriodicWaterMarks looks like this.
>> class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
>>     var waterMark  = 9999L
>>     override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long {
>>         return element.f1
>>     }
>>
>>     override fun getCurrentWatermark(): Watermark? {
>>         waterMark += 1
>>         return Watermark(waterMark)
>>     }
>> }
>>
>> In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time.
>> Stepping through the stream in a debugger, indicates that EventTime / Watremarks are generated as would expect.
>>
>> My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime.
>>
>> My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause?
>>
>> Thanks,
>>
>> Eric