Joining more than 2 streams

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Joining more than 2 streams

Gagan Agrawal
Hi,
I want to do window join on multiple Kafka streams (say a, b, c) on common field in all 3 streams and apply some custom function on joined stream. As I understand we can join only 2 streams at a time via DataStream api. So may be I need to join a and b first and then join first joined stream with c. I want to understand how would stream state be stored in backend? Since I will be joining a and b stream first, I believe both streams will be stored in state backend for window time. And then again join of first joined stream (of a and b) with c will result storage of all 3 streams for windowed period. Does that mean stream a and b are stored twice in state backend? 

Let's say instead of using inbuilt join api, if I rather union all 3 streams (after transforming them to common schema) and keyBy stream on common field and apply process function where I implement joining on my own and store streams in some state backend, will that be more storage efficient as I will be saving 3 streams just once instead of twice? 

Gagan 
Reply | Threaded
Open this post in threaded view
|

Re: Joining more than 2 streams

Fabian Hueske-2
Hi,

Yes, your reasoning is correct. If you use two binary joins, the data of the first two streams will be buffered twice.
Unioning all three streams and joining them in a custom ProcessFunction would reduce the amount of required state.

Best, Fabian

Am Sa., 24. Nov. 2018 um 14:08 Uhr schrieb Gagan Agrawal <[hidden email]>:
Hi,
I want to do window join on multiple Kafka streams (say a, b, c) on common field in all 3 streams and apply some custom function on joined stream. As I understand we can join only 2 streams at a time via DataStream api. So may be I need to join a and b first and then join first joined stream with c. I want to understand how would stream state be stored in backend? Since I will be joining a and b stream first, I believe both streams will be stored in state backend for window time. And then again join of first joined stream (of a and b) with c will result storage of all 3 streams for windowed period. Does that mean stream a and b are stored twice in state backend? 

Let's say instead of using inbuilt join api, if I rather union all 3 streams (after transforming them to common schema) and keyBy stream on common field and apply process function where I implement joining on my own and store streams in some state backend, will that be more storage efficient as I will be saving 3 streams just once instead of twice? 

Gagan