Hello, i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark. |
Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same. An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:
|
Hello Fabian, thank you for your answer! Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark? Best regards Wei Von: Fabian Hueske [mailto:[hidden email]] Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same. An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs. So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time. Hope that helps, Fabian 2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>: Hello, i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark. are the watermarks used to trigger the join? if yes, which one and how is it used? Thank you and best regards Wei |
Hi Fabian, How do I order by the merge time. Let's say I merge the stream at T1. I wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data from individual stream and the time at which the merge happens, they become out of order. Any thoughts will be really appreciated. Regards, Vijay Raajaa GS On Jul 31, 2017 1:14 AM, "wei" <[hidden email]> wrote:
|
Hi, @Wei: You can implement very different behavior using a CoProcessFunction. However, if your operator is time-based, the logical time of the operator will be the minimum time of both streams (time of the "slower" watermark). @Vijay: I did not understand what your requirements are. Do you want to join or merge streams? Those are two different things. This thread discusses joins not merging. Best, Fabian 2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
|
My bad. I meant only join. I am currently using keyBy on a timestamp common across the streams. Regards, Vijay Raajaa GS On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi Vijay, there are many ways to implement joins with a stateful CoProcessFunction. It gives you access to the timestamps of records and you can register timers that trigger when a certain time is reached. It is basically up to you how you join and emit data. You can drop late data or emit it. Please note that records are emitted either with their current timestamp (if in processElement()) or with the timestamp of the timer that fired (in onTimer()). Hope this helps, Fabian 2017-07-31 9:48 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
|
Hi Fabian, Currently, I am trying to assign watermark on the keyed stream. Please find a snippet of the code for better understanding; List < String > names = new ArrayList < > (); names.add("stream_a"); names.add("stream_b"); DataStream < String > messageStream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties)); KeyedStream < Tuple2 < String, JsonObject > , Tuple > pojo = messageStream.map(new JsonDeserializerv5()).keyBy(0); SingleOutputStreamOperator < Tuple2 < String, JsonObject >> watermarkStream = pojo.assignTimestampsAndWatermarks(new TimestampExtractorMergerv5()); DataStream < JsonObject > merge_stream = watermarkStream.keyBy(0).countWindow(2).apply(new JsonMergerv5()); The above snippet does a merge on the timestamp ( field (0) of the tuple ). But then, apply function is out of order , meaning even when the streams are joined at t2 which is less than watermark, they get processed by the apply function. Kindly let me know if I am not using the watermarking in a proper way or have misunderstood the usage of watermarks. Regards, Vijay Raajaa G S On Mon, Jul 31, 2017 at 2:09 PM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |