Events can overtake watermarks

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Events can overtake watermarks

Gyula Fóra-2
Hi,
In 1.5.1 I have noticed some strange behaviour that happens quite frequently and I just want to double check with you that this is intended.

If I have a non-parallel source that takes the following actions:

emit: event1
emit: watermark1
emit: event2

it can happen that a downstream operators receives watermark1 after event2. It doesn't happen very often but definitely seems to happen sometimes.

Maybe this is a property of the broadcastEmit(..) method but it seems a little funny :)

Thanks for the clarification!

Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Events can overtake watermarks

Aljoscha Krettek
Hi,

this happens only with Flink 1.5.1? I would expect that in a parallel setting it can happen that a watermark is delayed downstream because the watermark from the other inputs has not caught up yet to "watermark1" (because the watermark at an operator is the minimum of the input watermarks). Could that be the case?

Best,
Aljoscha

> On 22. Jul 2018, at 22:19, Gyula Fóra <[hidden email]> wrote:
>
> Hi,
> In 1.5.1 I have noticed some strange behaviour that happens quite frequently and I just want to double check with you that this is intended.
>
> If I have a non-parallel source that takes the following actions:
>
> emit: event1
> emit: watermark1
> emit: event2
>
> it can happen that a downstream operators receives watermark1 after event2. It doesn't happen very often but definitely seems to happen sometimes.
>
> Maybe this is a property of the broadcastEmit(..) method but it seems a little funny :)
>
> Thanks for the clarification!
>
> Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Events can overtake watermarks

Stefan Richter
In reply to this post by Gyula Fóra-2
Hi,

events overtaking watermarks doesn’t sound like a „wrong“ behaviour, only watermarks overtaking events would be bad. Do you think this only stated from Flink 1.5? To me this does not sound like a problem, but not sure if it is intended. Looping in Aljoscha, just in case.

Best,
Stefan

> Am 22.07.2018 um 22:19 schrieb Gyula Fóra <[hidden email]>:
>
> Hi,
> In 1.5.1 I have noticed some strange behaviour that happens quite frequently and I just want to double check with you that this is intended.
>
> If I have a non-parallel source that takes the following actions:
>
> emit: event1
> emit: watermark1
> emit: event2
>
> it can happen that a downstream operators receives watermark1 after event2. It doesn't happen very often but definitely seems to happen sometimes.
>
> Maybe this is a property of the broadcastEmit(..) method but it seems a little funny :)
>
> Thanks for the clarification!
>
> Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Events can overtake watermarks

Gyula Fóra-2
Hi guys,

Let me clarify. There is a single source with parallelism 1 and a single downstream operator with parallelism > 1. 
So the watermark is strictly controlled by the source. Also I am talking about calls to the processWatermark function of the downstream operator not about the watermark computation in general.

So in this case the source calls 

ctx.collectWithTimestamp(event1)
ctx.emitWatermark(watermark1)
ctx.collectWithTimestamp(event2)

And at the downstream operator sometimes event2 is processed before the watermark1. So for example if the operator has parallelism 4, 3 will probably get watermark1 before event2 as expected but one of them in the reverse order.

@Stefan: I havent tried this on 1.4.* but I havent noticed this before.

Gyula



Stefan Richter <[hidden email]> ezt írta (időpont: 2018. júl. 23., H, 10:29):
Hi,

events overtaking watermarks doesn’t sound like a „wrong“ behaviour, only watermarks overtaking events would be bad. Do you think this only stated from Flink 1.5? To me this does not sound like a problem, but not sure if it is intended. Looping in Aljoscha, just in case.

Best,
Stefan

> Am 22.07.2018 um 22:19 schrieb Gyula Fóra <[hidden email]>:
>
> Hi,
> In 1.5.1 I have noticed some strange behaviour that happens quite frequently and I just want to double check with you that this is intended.
>
> If I have a non-parallel source that takes the following actions:
>
> emit: event1
> emit: watermark1
> emit: event2
>
> it can happen that a downstream operators receives watermark1 after event2. It doesn't happen very often but definitely seems to happen sometimes.
>
> Maybe this is a property of the broadcastEmit(..) method but it seems a little funny :)
>
> Thanks for the clarification!
>
> Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Events can overtake watermarks

Gyula Fóra-2
Hm I wonder it could be because the downstream operator is a 2 input operator and I do some filtering on the source elements to direct to one of the inputs.
The filtering logic is chained but I guess in this case it can happen that the downstream operators reads 2 events from one input channel even though the other one should also have an element.

Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2018. júl. 23., H, 10:44):
Hi guys,

Let me clarify. There is a single source with parallelism 1 and a single downstream operator with parallelism > 1. 
So the watermark is strictly controlled by the source. Also I am talking about calls to the processWatermark function of the downstream operator not about the watermark computation in general.

So in this case the source calls 

ctx.collectWithTimestamp(event1)
ctx.emitWatermark(watermark1)
ctx.collectWithTimestamp(event2)

And at the downstream operator sometimes event2 is processed before the watermark1. So for example if the operator has parallelism 4, 3 will probably get watermark1 before event2 as expected but one of them in the reverse order.

@Stefan: I havent tried this on 1.4.* but I havent noticed this before.

Gyula



Stefan Richter <[hidden email]> ezt írta (időpont: 2018. júl. 23., H, 10:29):
Hi,

events overtaking watermarks doesn’t sound like a „wrong“ behaviour, only watermarks overtaking events would be bad. Do you think this only stated from Flink 1.5? To me this does not sound like a problem, but not sure if it is intended. Looping in Aljoscha, just in case.

Best,
Stefan

> Am 22.07.2018 um 22:19 schrieb Gyula Fóra <[hidden email]>:
>
> Hi,
> In 1.5.1 I have noticed some strange behaviour that happens quite frequently and I just want to double check with you that this is intended.
>
> If I have a non-parallel source that takes the following actions:
>
> emit: event1
> emit: watermark1
> emit: event2
>
> it can happen that a downstream operators receives watermark1 after event2. It doesn't happen very often but definitely seems to happen sometimes.
>
> Maybe this is a property of the broadcastEmit(..) method but it seems a little funny :)
>
> Thanks for the clarification!
>
> Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Events can overtake watermarks

Gyula Fóra-2
Yea, now that I think about it, thats probably the case. Sorry to bother :)

Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2018. júl. 23., H, 11:04):
Hm I wonder it could be because the downstream operator is a 2 input operator and I do some filtering on the source elements to direct to one of the inputs.
The filtering logic is chained but I guess in this case it can happen that the downstream operators reads 2 events from one input channel even though the other one should also have an element.

Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2018. júl. 23., H, 10:44):
Hi guys,

Let me clarify. There is a single source with parallelism 1 and a single downstream operator with parallelism > 1. 
So the watermark is strictly controlled by the source. Also I am talking about calls to the processWatermark function of the downstream operator not about the watermark computation in general.

So in this case the source calls 

ctx.collectWithTimestamp(event1)
ctx.emitWatermark(watermark1)
ctx.collectWithTimestamp(event2)

And at the downstream operator sometimes event2 is processed before the watermark1. So for example if the operator has parallelism 4, 3 will probably get watermark1 before event2 as expected but one of them in the reverse order.

@Stefan: I havent tried this on 1.4.* but I havent noticed this before.

Gyula



Stefan Richter <[hidden email]> ezt írta (időpont: 2018. júl. 23., H, 10:29):
Hi,

events overtaking watermarks doesn’t sound like a „wrong“ behaviour, only watermarks overtaking events would be bad. Do you think this only stated from Flink 1.5? To me this does not sound like a problem, but not sure if it is intended. Looping in Aljoscha, just in case.

Best,
Stefan

> Am 22.07.2018 um 22:19 schrieb Gyula Fóra <[hidden email]>:
>
> Hi,
> In 1.5.1 I have noticed some strange behaviour that happens quite frequently and I just want to double check with you that this is intended.
>
> If I have a non-parallel source that takes the following actions:
>
> emit: event1
> emit: watermark1
> emit: event2
>
> it can happen that a downstream operators receives watermark1 after event2. It doesn't happen very often but definitely seems to happen sometimes.
>
> Maybe this is a property of the broadcastEmit(..) method but it seems a little funny :)
>
> Thanks for the clarification!
>
> Gyula