Re: Perform processing only when watermark updates, buffer data otherwise

Posted by Manas Kale on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Perform-processing-only-when-watermark-updates-buffer-data-otherwise-tp34040p34049.html

Also
  •  What happens to watermarks after a union operation? Do I have to assignTimestampsAndWatermarks() again? I guess I will have to since multiple streams are being combined and Flink needs to know how to resolve individual watermarks.
  • What is the difference between union() and connect()?

On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <[hidden email]> wrote:
Hi,
I want to perform some processing on events only when the watermark is updated. Otherwise, for all other events, I want to keep buffering them till the watermark arrives. 
The main motivation behind doing this is that I have several operators that emit events/messages to a downstream operator. Since the order in which events arrive at the downstream operator is not guaranteed to be in chronological event time, I want to manually sort events when the watermark arrives and only then proceed.

Specifically, I want to first combine multiple streams and then do the above. Something like : 
stream1.union(stream2, steream3)...

One solution I am exploring is using a global window with a trigger that will fire only when the watermark updates.
stream1.union(stream2, steream3).
keyBy(...).
window(GlobalWindows.create()).
trigger(new OnWatermarkUpdateTrigger()).
process(...)

I will store the latest watermark in the trigger's state store. In the onElement() method, I will FIRE if the current watermark is different than the stored one.

Is this the best way to implement the functionality described above?