Hi,
I want to perform some processing on events only when the watermark is updated. Otherwise, for all other events, I want to keep buffering them till the watermark arrives. The main motivation behind doing this is that I have several operators that emit events/messages to a downstream operator. Since the order in which events arrive at the downstream operator is not guaranteed to be in chronological event time, I want to manually sort events when the watermark arrives and only then proceed. Specifically, I want to first combine multiple streams and then do the above. Something like : stream1.union(stream2, steream3)... One solution I am exploring is using a global window with a trigger that will fire only when the watermark updates. stream1.union(stream2, steream3). keyBy(...). window(GlobalWindows.create()). trigger(new OnWatermarkUpdateTrigger()). process(...) I will store the latest watermark in the trigger's state store. In the onElement() method, I will FIRE if the current watermark is different than the stored one. Is this the best way to implement the functionality described above? |
Also
On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <[hidden email]> wrote:
|
Hi Manas,
first of all, after assigning watermarks at the source level, usually Flink operators make sure to handle the watermarks. In case of a `union()`, the subsequent operator will increment its internal event-time clock and emit a new watermark only if all input streams (and their parallel instances) have reached a common event-time. Your sorting use case can be easily done with a KeyedProcessFunction [1]. You can buffer your events in a list state, and process them when a timer fires. The documentation also explains how to set a timer. If you want to fire when the next watermark arrives, you can set a timer like: ctx.timerService().currentWatermark() + 1 The `union()` is meant for combining streams of the same data into one where the order of the event does not matter. However, watermarks are still arriving in order so a sorting by event-time should not be a problem. connect() is broader than a join (see also the answer here [2]). I hope I could answer most of your questions. Feel free to ask further questions. Regards, Timo [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function [2] https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect On 02.04.20 12:11, Manas Kale wrote: > Also > > * What happens to watermarks after a union operation? Do I have to > assignTimestampsAndWatermarks() again? I guess I will have to since > multiple streams are being combined and Flink needs to know how to > resolve individual watermarks. > * What is the difference between union() and connect()? > > > On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi, > I want to perform some processing on events only when the watermark > is updated. Otherwise, for all other events, I want to keep > buffering them till the watermark arrives. > The main motivation behind doing this is that I have several > operators that emit events/messages to a downstream operator. Since > the order in which events arrive at the downstream operator is not > guaranteed to be in chronological event time, I want to manually > sort events when the watermark arrives and only then proceed. > > Specifically, I want to first combine multiple streams and then do > the above. Something like : > stream1.union(stream2, steream3)... > > One solution I am exploring is using a global window with a trigger > that will fire only when the watermark updates. > stream1.union(stream2, steream3). > keyBy(...). > window(GlobalWindows.create()). > trigger(new OnWatermarkUpdateTrigger()). > process(...) > > I will store the latest watermark in the trigger's state store. In > the onElement() method, I will FIRE if the current watermark is > different than the stored one. > > Is this the best way to implement the functionality described above? > |
Hi Timo, Thanks for the information. On Thu, Apr 2, 2020 at 9:30 PM Timo Walther <[hidden email]> wrote: Hi Manas, |
Free forum by Nabble | Edit this page |