Timestamps and watermarks in CoProcessFunction function

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamps and watermarks in CoProcessFunction function

William Saar-2

Hi,
I have added the code below to the start of processElement2 in CoProcessFunction. It prints timestamps and watermarks for the first 3 elements for each new watermark. Shouldn't the timestamp always be lower than the next watermark? The 3 timestamps before the last watermark are all larger than the watermark time

The output I get is
wm -9223372036854775808
ts 1478815851242
ts 1478816075096
ts 1478816114186
wm 1478822353934
ts 1478835814359
ts 1478835083219
ts 1478836126621
wm 1478827220420
ts 1478836408336
ts 1478836469247
ts 1478836759959

if (getRuntimeContext.getIndexOfThisSubtask == 0) {
if (context.timerService().currentWatermark() != printedWatermark) {
printedWatermark = context.timerService().currentWatermark()
println("wm " + printedWatermark)
n = 0
} else {
n += 1
}
if (n < 3) {
println("ts " + context.timestamp())
}
}



Reply | Threaded
Open this post in threaded view
|

Re: Timestamps and watermarks in CoProcessFunction function

Eron Wright
Consider the watermarks that are generated by your chosen watermark generator as an +assertion+ about the progression of time, based on domain knowledge, observation of elements, and connector specifics.  The generator is asserting that any elements observed after a given watermark will come later in event time, e.g. "we've reached 12:00 PM; subsequent events will have a timestamp greater than 12:00 PM".

Your specific output seems fine to me.  It reads like, "event @ 11:59, watermark @ 12:00, event @ 12:02, watermark @ 12:01, event @ 12:03".  The watermark assertion wasn't violated in this situation.

Some operators provide special "late event" handling logic for the situation that the assertion is violated.   The process function is quite flexible, providing timers to observe the progression of time (due to watermarks), and making it possible to handle late events as you see fit.  Often a process function will buffer events until a certain time is reached.

Hope this helps!
Eron

On Tue, Jan 16, 2018 at 8:51 AM, William Saar <[hidden email]> wrote:

Hi,
I have added the code below to the start of processElement2 in CoProcessFunction. It prints timestamps and watermarks for the first 3 elements for each new watermark. Shouldn't the timestamp always be lower than the next watermark? The 3 timestamps before the last watermark are all larger than the watermark time

The output I get is
wm -9223372036854775808
ts 1478815851242
ts 1478816075096
ts 1478816114186
wm 1478822353934
ts 1478835814359
ts 1478835083219
ts 1478836126621
wm 1478827220420
ts 1478836408336
ts 1478836469247
ts 1478836759959

if (getRuntimeContext.getIndexOfThisSubtask == 0) {
if (context.timerService().currentWatermark() != printedWatermark) {
printedWatermark = context.timerService().currentWatermark()
println("wm " + printedWatermark)
n = 0
} else {
n += 1
}
if (n < 3) {
println("ts " + context.timestamp())
}
}