how to propagate watermarks across multiple jobs

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

how to propagate watermarks across multiple jobs

yidan zhao
I have a job which includes about 50+ tasks. I want to split it to multiple jobs, and the data is transferred through Kafka, but how about watermark? 

Is anyone have do something similar and solved this problem?

Here I give an example:
The original job: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==> xxxWindow2 resultSinkToKafka(result-topic).

The new job1: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==> resultSinkToKafka(mid-topic).
The new job2: kafkaStream1(mid-topic) => xxxWindow2 ==> resultSinkToKafka(result-topic).

The watermark for window1 and window 2 is separated to two jobs, which also seems to be working, but this introduces a 5-minute delay for window2 (both window is 5min's cycle). 

The key problem is that the window's cycle is 5min, so the window2 will have a 5min's delay. 
If watermark can be transferred between jobs, it is not a problem anymore.

Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

Piotr Nowojski-4
Hi,

Can not you write the watermark as a special event to the "mid-topic"? In the "new job2" you would parse this event and use it to assign watermark before `xxxWindow2`? I believe this is what FlinkKafkaShuffle is doing [1], you could look at its code for inspiration.

Piotrek 


pon., 1 mar 2021 o 13:01 yidan zhao <[hidden email]> napisał(a):
I have a job which includes about 50+ tasks. I want to split it to multiple jobs, and the data is transferred through Kafka, but how about watermark? 

Is anyone have do something similar and solved this problem?

Here I give an example:
The original job: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==> xxxWindow2 resultSinkToKafka(result-topic).

The new job1: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==> resultSinkToKafka(mid-topic).
The new job2: kafkaStream1(mid-topic) => xxxWindow2 ==> resultSinkToKafka(result-topic).

The watermark for window1 and window 2 is separated to two jobs, which also seems to be working, but this introduces a 5-minute delay for window2 (both window is 5min's cycle). 

The key problem is that the window's cycle is 5min, so the window2 will have a 5min's delay. 
If watermark can be transferred between jobs, it is not a problem anymore.

Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

yidan zhao
Yes, you are right and thank you. I take a brief look at what FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

Piotr Nowojski-4
Great :)

Just one more note. Currently FlinkKafkaShuffle has a critical bug [1] that probably will prevent you from using it directly. I hope it will be fixed in some next release. In the meantime you can just inspire your solution with the source code.

Best,
Piotrek



czw., 4 mar 2021 o 03:48 yidan zhao <[hidden email]> napisał(a):
Yes, you are right and thank you. I take a brief look at what FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

yidan zhao
One more question, If I only need watermark's logic, not keyedStream, why not provide methods such as writeDataStream and readDataStream. It uses the similar methods for kafka producer sink records and broadcast watermark to partitions and then kafka consumers read it and regenerate the watermark. I think it will be more general? In this way, the kafka consumer reads the stream from kafka, and can continue to call keyBy to get a keyedStream. I don't know why KafkaShuffle only considers the 'keyedStream' case.

Piotr Nowojski <[hidden email]> 于2021年3月4日周四 下午3:54写道:
Great :)

Just one more note. Currently FlinkKafkaShuffle has a critical bug [1] that probably will prevent you from using it directly. I hope it will be fixed in some next release. In the meantime you can just inspire your solution with the source code.

Best,
Piotrek



czw., 4 mar 2021 o 03:48 yidan zhao <[hidden email]> napisał(a):
Yes, you are right and thank you. I take a brief look at what FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

yidan zhao
I uploaded a picture to describe that.
Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

yidan zhao
And do you know when kafka consumer/producer will be re implemented according to the new source/sink api? I am thinking whether I should adjust the code for now, since I need to re adjust the code when it is reconstructed to the new source/sink api.

yidan zhao <[hidden email]> 于2021年3月4日周四 下午4:44写道:
I uploaded a picture to describe that.
Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

Yuan Mei
In reply to this post by yidan zhao
Hey Yidan, 

KafkaShuffle is initially motivated to support shuffle data materialization on Kafka, and started with a limited version supporting hash-partition only. Watermark is maintained and forwarded as part of shuffle data. So you are right, watermark storing/forwarding logic has nothing to do with whether the stream is keyed or not. The current approach in KafkaShuffle should also work for non-keyed streams if I remember correclty. So, yes, the logic can be extracted and generalized.

Best,

Yuan

On Thu, Mar 4, 2021 at 4:26 PM yidan zhao <[hidden email]> wrote:
One more question, If I only need watermark's logic, not keyedStream, why not provide methods such as writeDataStream and readDataStream. It uses the similar methods for kafka producer sink records and broadcast watermark to partitions and then kafka consumers read it and regenerate the watermark. I think it will be more general? In this way, the kafka consumer reads the stream from kafka, and can continue to call keyBy to get a keyedStream. I don't know why KafkaShuffle only considers the 'keyedStream' case.

Piotr Nowojski <[hidden email]> 于2021年3月4日周四 下午3:54写道:
Great :)

Just one more note. Currently FlinkKafkaShuffle has a critical bug [1] that probably will prevent you from using it directly. I hope it will be fixed in some next release. In the meantime you can just inspire your solution with the source code.

Best,
Piotrek



czw., 4 mar 2021 o 03:48 yidan zhao <[hidden email]> napisał(a):
Yes, you are right and thank you. I take a brief look at what FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
Reply | Threaded
Open this post in threaded view
|

Re: how to propagate watermarks across multiple jobs

yidan zhao
Thank you.

Yuan Mei <[hidden email]> 于2021年3月4日周四 下午11:10写道:
Hey Yidan, 

KafkaShuffle is initially motivated to support shuffle data materialization on Kafka, and started with a limited version supporting hash-partition only. Watermark is maintained and forwarded as part of shuffle data. So you are right, watermark storing/forwarding logic has nothing to do with whether the stream is keyed or not. The current approach in KafkaShuffle should also work for non-keyed streams if I remember correclty. So, yes, the logic can be extracted and generalized.

Best,

Yuan

On Thu, Mar 4, 2021 at 4:26 PM yidan zhao <[hidden email]> wrote:
One more question, If I only need watermark's logic, not keyedStream, why not provide methods such as writeDataStream and readDataStream. It uses the similar methods for kafka producer sink records and broadcast watermark to partitions and then kafka consumers read it and regenerate the watermark. I think it will be more general? In this way, the kafka consumer reads the stream from kafka, and can continue to call keyBy to get a keyedStream. I don't know why KafkaShuffle only considers the 'keyedStream' case.

Piotr Nowojski <[hidden email]> 于2021年3月4日周四 下午3:54写道:
Great :)

Just one more note. Currently FlinkKafkaShuffle has a critical bug [1] that probably will prevent you from using it directly. I hope it will be fixed in some next release. In the meantime you can just inspire your solution with the source code.

Best,
Piotrek



czw., 4 mar 2021 o 03:48 yidan zhao <[hidden email]> napisał(a):
Yes, you are right and thank you. I take a brief look at what FlinkKafkaShuffle is doing, it seems what I need and I will have a try.