Perform processing only when watermark updates, buffer data otherwise

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Perform processing only when watermark updates, buffer data otherwise

Manas Kale
Hi,
I want to perform some processing on events only when the watermark is updated. Otherwise, for all other events, I want to keep buffering them till the watermark arrives. 
The main motivation behind doing this is that I have several operators that emit events/messages to a downstream operator. Since the order in which events arrive at the downstream operator is not guaranteed to be in chronological event time, I want to manually sort events when the watermark arrives and only then proceed.

Specifically, I want to first combine multiple streams and then do the above. Something like : 
stream1.union(stream2, steream3)...

One solution I am exploring is using a global window with a trigger that will fire only when the watermark updates.
stream1.union(stream2, steream3).
keyBy(...).
window(GlobalWindows.create()).
trigger(new OnWatermarkUpdateTrigger()).
process(...)

I will store the latest watermark in the trigger's state store. In the onElement() method, I will FIRE if the current watermark is different than the stored one.

Is this the best way to implement the functionality described above?

Reply | Threaded
Open this post in threaded view
|

Re: Perform processing only when watermark updates, buffer data otherwise

Manas Kale
Also
  •  What happens to watermarks after a union operation? Do I have to assignTimestampsAndWatermarks() again? I guess I will have to since multiple streams are being combined and Flink needs to know how to resolve individual watermarks.
  • What is the difference between union() and connect()?

On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <[hidden email]> wrote:
Hi,
I want to perform some processing on events only when the watermark is updated. Otherwise, for all other events, I want to keep buffering them till the watermark arrives. 
The main motivation behind doing this is that I have several operators that emit events/messages to a downstream operator. Since the order in which events arrive at the downstream operator is not guaranteed to be in chronological event time, I want to manually sort events when the watermark arrives and only then proceed.

Specifically, I want to first combine multiple streams and then do the above. Something like : 
stream1.union(stream2, steream3)...

One solution I am exploring is using a global window with a trigger that will fire only when the watermark updates.
stream1.union(stream2, steream3).
keyBy(...).
window(GlobalWindows.create()).
trigger(new OnWatermarkUpdateTrigger()).
process(...)

I will store the latest watermark in the trigger's state store. In the onElement() method, I will FIRE if the current watermark is different than the stored one.

Is this the best way to implement the functionality described above?

Reply | Threaded
Open this post in threaded view
|

Re: Perform processing only when watermark updates, buffer data otherwise

Timo Walther
Hi Manas,

first of all, after assigning watermarks at the source level, usually
Flink operators make sure to handle the watermarks.

In case of a `union()`, the subsequent operator will increment its
internal event-time clock and emit a new watermark only if all input
streams (and their parallel instances) have reached a common event-time.

Your sorting use case can be easily done with a KeyedProcessFunction
[1]. You can buffer your events in a list state, and process them when a
timer fires. The documentation also explains how to set a timer.

If you want to fire when the next watermark arrives, you can set a timer
like:

ctx.timerService().currentWatermark() + 1

The `union()` is meant for combining streams of the same data into one
where the order of the event does not matter. However, watermarks are
still arriving in order so a sorting by event-time should not be a problem.

connect() is broader than a join (see also the answer here [2]).

I hope I could answer most of your questions. Feel free to ask further
questions.

Regards,
Timo


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function
[2]
https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect



On 02.04.20 12:11, Manas Kale wrote:

> Also
>
>   *   What happens to watermarks after a union operation? Do I have to
>     assignTimestampsAndWatermarks() again? I guess I will have to since
>     multiple streams are being combined and Flink needs to know how to
>     resolve individual watermarks.
>   * What is the difference between union() and connect()?
>
>
> On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>     I want to perform some processing on events only when the watermark
>     is updated. Otherwise, for all other events, I want to keep
>     buffering them till the watermark arrives.
>     The main motivation behind doing this is that I have several
>     operators that emit events/messages to a downstream operator. Since
>     the order in which events arrive at the downstream operator is not
>     guaranteed to be in chronological event time, I want to manually
>     sort events when the watermark arrives and only then proceed.
>
>     Specifically, I want to first combine multiple streams and then do
>     the above. Something like :
>     stream1.union(stream2, steream3)...
>
>     One solution I am exploring is using a global window with a trigger
>     that will fire only when the watermark updates.
>     stream1.union(stream2, steream3).
>     keyBy(...).
>     window(GlobalWindows.create()).
>     trigger(new OnWatermarkUpdateTrigger()).
>     process(...)
>
>     I will store the latest watermark in the trigger's state store. In
>     the onElement() method, I will FIRE if the current watermark is
>     different than the stored one.
>
>     Is this the best way to implement the functionality described above?
>

Reply | Threaded
Open this post in threaded view
|

Re: Perform processing only when watermark updates, buffer data otherwise

Manas Kale
Hi Timo,
Thanks for the information.

On Thu, Apr 2, 2020 at 9:30 PM Timo Walther <[hidden email]> wrote:
Hi Manas,

first of all, after assigning watermarks at the source level, usually
Flink operators make sure to handle the watermarks.

In case of a `union()`, the subsequent operator will increment its
internal event-time clock and emit a new watermark only if all input
streams (and their parallel instances) have reached a common event-time.

Your sorting use case can be easily done with a KeyedProcessFunction
[1]. You can buffer your events in a list state, and process them when a
timer fires. The documentation also explains how to set a timer.

If you want to fire when the next watermark arrives, you can set a timer
like:

ctx.timerService().currentWatermark() + 1

The `union()` is meant for combining streams of the same data into one
where the order of the event does not matter. However, watermarks are
still arriving in order so a sorting by event-time should not be a problem.

connect() is broader than a join (see also the answer here [2]).

I hope I could answer most of your questions. Feel free to ask further
questions.

Regards,
Timo


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function
[2]
https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect



On 02.04.20 12:11, Manas Kale wrote:
> Also
>
>   *   What happens to watermarks after a union operation? Do I have to
>     assignTimestampsAndWatermarks() again? I guess I will have to since
>     multiple streams are being combined and Flink needs to know how to
>     resolve individual watermarks.
>   * What is the difference between union() and connect()?
>
>
> On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>     I want to perform some processing on events only when the watermark
>     is updated. Otherwise, for all other events, I want to keep
>     buffering them till the watermark arrives.
>     The main motivation behind doing this is that I have several
>     operators that emit events/messages to a downstream operator. Since
>     the order in which events arrive at the downstream operator is not
>     guaranteed to be in chronological event time, I want to manually
>     sort events when the watermark arrives and only then proceed.
>
>     Specifically, I want to first combine multiple streams and then do
>     the above. Something like :
>     stream1.union(stream2, steream3)...
>
>     One solution I am exploring is using a global window with a trigger
>     that will fire only when the watermark updates.
>     stream1.union(stream2, steream3).
>     keyBy(...).
>     window(GlobalWindows.create()).
>     trigger(new OnWatermarkUpdateTrigger()).
>     process(...)
>
>     I will store the latest watermark in the trigger's state store. In
>     the onElement() method, I will FIRE if the current watermark is
>     different than the stored one.
>
>     Is this the best way to implement the functionality described above?
>