Posted by
Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Perform-processing-only-when-watermark-updates-buffer-data-otherwise-tp34040p34055.html
Hi Manas,
first of all, after assigning watermarks at the source level, usually
Flink operators make sure to handle the watermarks.
In case of a `union()`, the subsequent operator will increment its
internal event-time clock and emit a new watermark only if all input
streams (and their parallel instances) have reached a common event-time.
Your sorting use case can be easily done with a KeyedProcessFunction
[1]. You can buffer your events in a list state, and process them when a
timer fires. The documentation also explains how to set a timer.
If you want to fire when the next watermark arrives, you can set a timer
like:
ctx.timerService().currentWatermark() + 1
The `union()` is meant for combining streams of the same data into one
where the order of the event does not matter. However, watermarks are
still arriving in order so a sorting by event-time should not be a problem.
connect() is broader than a join (see also the answer here [2]).
I hope I could answer most of your questions. Feel free to ask further
questions.
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function[2]
https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connectOn 02.04.20 12:11, Manas Kale wrote:
> Also
>
> * What happens to watermarks after a union operation? Do I have to
> assignTimestampsAndWatermarks() again? I guess I will have to since
> multiple streams are being combined and Flink needs to know how to
> resolve individual watermarks.
> * What is the difference between union() and connect()?
>
>
> On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi,
> I want to perform some processing on events only when the watermark
> is updated. Otherwise, for all other events, I want to keep
> buffering them till the watermark arrives.
> The main motivation behind doing this is that I have several
> operators that emit events/messages to a downstream operator. Since
> the order in which events arrive at the downstream operator is not
> guaranteed to be in chronological event time, I want to manually
> sort events when the watermark arrives and only then proceed.
>
> Specifically, I want to first combine multiple streams and then do
> the above. Something like :
> stream1.union(stream2, steream3)...
>
> One solution I am exploring is using a global window with a trigger
> that will fire only when the watermark updates.
> stream1.union(stream2, steream3).
> keyBy(...).
> window(GlobalWindows.create()).
> trigger(new OnWatermarkUpdateTrigger()).
> process(...)
>
> I will store the latest watermark in the trigger's state store. In
> the onElement() method, I will FIRE if the current watermark is
> different than the stored one.
>
> Is this the best way to implement the functionality described above?
>