Is watermark used by joining two streams

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Is watermark used by joining two streams

xie wei
Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.
are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards
Wei


Reply | Threaded
Open this post in threaded view
|

Re: Is watermark used by joining two streams

Fabian Hueske-2
Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.
An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.
So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,
Fabian


2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:
Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.
are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards
Wei



Reply | Threaded
Open this post in threaded view
|

AW: Is watermark used by joining two streams

xie wei

Hello Fabian,

 

thank you for your answer!

 

Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark?

 

Best regards

Wei

 

Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sunday, July 30, 2017 11:17 AM
An: xie wei
Cc: user
Betreff: Re: Is watermark used by joining two streams

 

Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,

Fabian

 

 

2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:

Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.

are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards

Wei

 

 

Reply | Threaded
Open this post in threaded view
|

Re: AW: Is watermark used by joining two streams

G.S.Vijay Raajaa
Hi Fabian,

How do I order by the merge time. Let's say I merge the stream at T1. I wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data from individual stream and the time at which the merge happens, they become out of order. Any thoughts will be really appreciated.

Regards,
Vijay Raajaa GS

On Jul 31, 2017 1:14 AM, "wei" <[hidden email]> wrote:

Hello Fabian,

 

thank you for your answer!

 

Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark?

 

Best regards

Wei

 

Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sunday, July 30, 2017 11:17 AM
An: xie wei
Cc: user
Betreff: Re: Is watermark used by joining two streams

 

Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,

Fabian

 

 

2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:

Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.

are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards

Wei

 

 


Reply | Threaded
Open this post in threaded view
|

Re: AW: Is watermark used by joining two streams

Fabian Hueske-2
Hi,

@Wei: You can implement very different behavior using a CoProcessFunction. However, if your operator is time-based, the logical time of the operator will be the minimum time of both streams (time of the "slower" watermark).

@Vijay: I did not understand what your requirements are. Do you want to join or merge streams? Those are two different things. This thread discusses joins not merging.

Best,
Fabian

2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
Hi Fabian,

How do I order by the merge time. Let's say I merge the stream at T1. I wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data from individual stream and the time at which the merge happens, they become out of order. Any thoughts will be really appreciated.

Regards,
Vijay Raajaa GS

On Jul 31, 2017 1:14 AM, "wei" <[hidden email]> wrote:

Hello Fabian,

 

thank you for your answer!

 

Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark?

 

Best regards

Wei

 

Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sunday, July 30, 2017 11:17 AM
An: xie wei
Cc: user
Betreff: Re: Is watermark used by joining two streams

 

Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,

Fabian

 

 

2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:

Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.

are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards

Wei

 

 



Reply | Threaded
Open this post in threaded view
|

Re: AW: Is watermark used by joining two streams

G.S.Vijay Raajaa
My bad. I meant only join. I am currently using keyBy on a timestamp common across the streams.

Regards,
Vijay Raajaa GS 

On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

@Wei: You can implement very different behavior using a CoProcessFunction. However, if your operator is time-based, the logical time of the operator will be the minimum time of both streams (time of the "slower" watermark).

@Vijay: I did not understand what your requirements are. Do you want to join or merge streams? Those are two different things. This thread discusses joins not merging.

Best,
Fabian

2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
Hi Fabian,

How do I order by the merge time. Let's say I merge the stream at T1. I wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data from individual stream and the time at which the merge happens, they become out of order. Any thoughts will be really appreciated.

Regards,
Vijay Raajaa GS

On Jul 31, 2017 1:14 AM, "wei" <[hidden email]> wrote:

Hello Fabian,

 

thank you for your answer!

 

Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark?

 

Best regards

Wei

 

Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sunday, July 30, 2017 11:17 AM
An: xie wei
Cc: user
Betreff: Re: Is watermark used by joining two streams

 

Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,

Fabian

 

 

2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:

Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.

are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards

Wei

 

 




Reply | Threaded
Open this post in threaded view
|

Re: AW: Is watermark used by joining two streams

Fabian Hueske-2
Hi Vijay,

there are many ways to implement joins with a stateful CoProcessFunction.
It gives you access to the timestamps of records and you can register timers that trigger when a certain time is reached.
It is basically up to you how you join and emit data. You can drop late data or emit it. Please note that records are emitted either with their current timestamp (if in processElement()) or with the timestamp of the timer that fired (in onTimer()).

Hope this helps,
Fabian



2017-07-31 9:48 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
My bad. I meant only join. I am currently using keyBy on a timestamp common across the streams.

Regards,
Vijay Raajaa GS 

On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

@Wei: You can implement very different behavior using a CoProcessFunction. However, if your operator is time-based, the logical time of the operator will be the minimum time of both streams (time of the "slower" watermark).

@Vijay: I did not understand what your requirements are. Do you want to join or merge streams? Those are two different things. This thread discusses joins not merging.

Best,
Fabian

2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
Hi Fabian,

How do I order by the merge time. Let's say I merge the stream at T1. I wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data from individual stream and the time at which the merge happens, they become out of order. Any thoughts will be really appreciated.

Regards,
Vijay Raajaa GS

On Jul 31, 2017 1:14 AM, "wei" <[hidden email]> wrote:

Hello Fabian,

 

thank you for your answer!

 

Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark?

 

Best regards

Wei

 

Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sunday, July 30, 2017 11:17 AM
An: xie wei
Cc: user
Betreff: Re: Is watermark used by joining two streams

 

Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,

Fabian

 

 

2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:

Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.

are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards

Wei

 

 





Reply | Threaded
Open this post in threaded view
|

Re: AW: Is watermark used by joining two streams

G.S.Vijay Raajaa
Hi Fabian,

Thanks for the reply. I shall try the CoProcessFunction implementation.

Currently, I am trying to assign watermark on the keyed stream. Please find a snippet of the code for better understanding;

List < String > names = new ArrayList < > ();

names.add("stream_a");

names.add("stream_b");

DataStream < String > messageStream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

KeyedStream < Tuple2 < String, JsonObject > , Tuple > pojo = messageStream.map(new JsonDeserializerv5()).keyBy(0);

SingleOutputStreamOperator < Tuple2 < String, JsonObject >> watermarkStream = pojo.assignTimestampsAndWatermarks(new TimestampExtractorMergerv5());

DataStream < JsonObject > merge_stream = watermarkStream.keyBy(0).countWindow(2).apply(new JsonMergerv5());

The above snippet does a merge on the timestamp ( field (0) of the tuple ). But then, apply function is out of order , meaning even when the streams are joined at t2 which is less than watermark, they get processed by the apply function. Kindly let me know if I am not using the watermarking in a proper way or have misunderstood the usage of watermarks.

Regards,

Vijay Raajaa G S


On Mon, Jul 31, 2017 at 2:09 PM, Fabian Hueske <[hidden email]> wrote:
Hi Vijay,

there are many ways to implement joins with a stateful CoProcessFunction.
It gives you access to the timestamps of records and you can register timers that trigger when a certain time is reached.
It is basically up to you how you join and emit data. You can drop late data or emit it. Please note that records are emitted either with their current timestamp (if in processElement()) or with the timestamp of the timer that fired (in onTimer()).

Hope this helps,
Fabian



2017-07-31 9:48 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
My bad. I meant only join. I am currently using keyBy on a timestamp common across the streams.

Regards,
Vijay Raajaa GS 

On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

@Wei: You can implement very different behavior using a CoProcessFunction. However, if your operator is time-based, the logical time of the operator will be the minimum time of both streams (time of the "slower" watermark).

@Vijay: I did not understand what your requirements are. Do you want to join or merge streams? Those are two different things. This thread discusses joins not merging.

Best,
Fabian

2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <[hidden email]>:
Hi Fabian,

How do I order by the merge time. Let's say I merge the stream at T1. I wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data from individual stream and the time at which the merge happens, they become out of order. Any thoughts will be really appreciated.

Regards,
Vijay Raajaa GS

On Jul 31, 2017 1:14 AM, "wei" <[hidden email]> wrote:

Hello Fabian,

 

thank you for your answer!

 

Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark?

 

Best regards

Wei

 

Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sunday, July 30, 2017 11:17 AM
An: xie wei
Cc: user
Betreff: Re: Is watermark used by joining two streams

 

Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,

Fabian

 

 

2017-07-28 15:00 GMT+02:00 xie wei <[hidden email]>:

Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.

are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards

Wei