Posted by 
Timo Walther on 
					
					URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Perform-processing-only-when-watermark-updates-buffer-data-otherwise-tp34040p34055.html
					
					Hi Manas,
first of all, after assigning watermarks at the source level, usually 
Flink operators make sure to handle the watermarks.
In case of a `union()`, the subsequent operator will increment its 
internal event-time clock and emit a new watermark only if all input 
streams (and their parallel instances) have reached a common event-time.
Your sorting use case can be easily done with a KeyedProcessFunction 
[1]. You can buffer your events in a list state, and process them when a 
timer fires. The documentation also explains how to set a timer.
If you want to fire when the next watermark arrives, you can set a timer 
like:
ctx.timerService().currentWatermark() + 1
The `union()` is meant for combining streams of the same data into one 
where the order of the event does not matter. However, watermarks are 
still arriving in order so a sorting by event-time should not be a problem.
connect() is broader than a join (see also the answer here [2]).
I hope I could answer most of your questions. Feel free to ask further 
questions.
Regards,
Timo
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function[2] 
https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connectOn 02.04.20 12:11, Manas Kale wrote:
> Also
> 
>   *   What happens to watermarks after a union operation? Do I have to
>     assignTimestampsAndWatermarks() again? I guess I will have to since
>     multiple streams are being combined and Flink needs to know how to
>     resolve individual watermarks.
>   * What is the difference between union() and connect()?
> 
> 
> On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <
[hidden email] 
> <mailto:
[hidden email]>> wrote:
> 
>     Hi,
>     I want to perform some processing on events only when the watermark
>     is updated. Otherwise, for all other events, I want to keep
>     buffering them till the watermark arrives.
>     The main motivation behind doing this is that I have several
>     operators that emit events/messages to a downstream operator. Since
>     the order in which events arrive at the downstream operator is not
>     guaranteed to be in chronological event time, I want to manually
>     sort events when the watermark arrives and only then proceed.
> 
>     Specifically, I want to first combine multiple streams and then do
>     the above. Something like :
>     stream1.union(stream2, steream3)...
> 
>     One solution I am exploring is using a global window with a trigger
>     that will fire only when the watermark updates.
>     stream1.union(stream2, steream3).
>     keyBy(...).
>     window(GlobalWindows.create()).
>     trigger(new OnWatermarkUpdateTrigger()).
>     process(...)
> 
>     I will store the latest watermark in the trigger's state store. In
>     the onElement() method, I will FIRE if the current watermark is
>     different than the stored one.
> 
>     Is this the best way to implement the functionality described above?
>