Re: Perform processing only when watermark updates, buffer data otherwise

Posted by Manas Kale on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Perform-processing-only-when-watermark-updates-buffer-data-otherwise-tp34040p34098.html

Hi Timo,
Thanks for the information.

On Thu, Apr 2, 2020 at 9:30 PM Timo Walther <[hidden email]> wrote:
Hi Manas,

first of all, after assigning watermarks at the source level, usually
Flink operators make sure to handle the watermarks.

In case of a `union()`, the subsequent operator will increment its
internal event-time clock and emit a new watermark only if all input
streams (and their parallel instances) have reached a common event-time.

Your sorting use case can be easily done with a KeyedProcessFunction
[1]. You can buffer your events in a list state, and process them when a
timer fires. The documentation also explains how to set a timer.

If you want to fire when the next watermark arrives, you can set a timer
like:

ctx.timerService().currentWatermark() + 1

The `union()` is meant for combining streams of the same data into one
where the order of the event does not matter. However, watermarks are
still arriving in order so a sorting by event-time should not be a problem.

connect() is broader than a join (see also the answer here [2]).

I hope I could answer most of your questions. Feel free to ask further
questions.

Regards,
Timo


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function
[2]
https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect



On 02.04.20 12:11, Manas Kale wrote:
> Also
>
>   *   What happens to watermarks after a union operation? Do I have to
>     assignTimestampsAndWatermarks() again? I guess I will have to since
>     multiple streams are being combined and Flink needs to know how to
>     resolve individual watermarks.
>   * What is the difference between union() and connect()?
>
>
> On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>     I want to perform some processing on events only when the watermark
>     is updated. Otherwise, for all other events, I want to keep
>     buffering them till the watermark arrives.
>     The main motivation behind doing this is that I have several
>     operators that emit events/messages to a downstream operator. Since
>     the order in which events arrive at the downstream operator is not
>     guaranteed to be in chronological event time, I want to manually
>     sort events when the watermark arrives and only then proceed.
>
>     Specifically, I want to first combine multiple streams and then do
>     the above. Something like :
>     stream1.union(stream2, steream3)...
>
>     One solution I am exploring is using a global window with a trigger
>     that will fire only when the watermark updates.
>     stream1.union(stream2, steream3).
>     keyBy(...).
>     window(GlobalWindows.create()).
>     trigger(new OnWatermarkUpdateTrigger()).
>     process(...)
>
>     I will store the latest watermark in the trigger's state store. In
>     the onElement() method, I will FIRE if the current watermark is
>     different than the stored one.
>
>     Is this the best way to implement the functionality described above?
>