multiple streams joining

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

multiple streams joining

Lian Jiang
Hi,

Imagine I have one class having 4 fields: ID, A, B, C.  There are three data sources providing data in the form of (ID, A), (ID, B), (ID, C) respectively. I want to join these three data sources to get final (ID, A, B, C) without any window. For example, (ID, A) could come one month after (ID, B). Such joining needs global states. There are two designs in my mind.

1. Stream connect with separated kafka topic
streamA_B = DataSourceA connect DataSourceB
streamA_B_C = streamA_B connect DataSourceC

Each data source is ingested via a dedicated kafka topic. This design seems not scalable because I need N stream connect operations for N+1 data sources. Each stream connect needs to maintain a global state. For example, streamA_B needs a global state for maintaining (ID, A, B) and streamA_B_C needs another for maintaining (ID, A, B, C).

2. Shared kafka topic
All data sources are ingested via a shared kafka topic (using union event type or schema reference). Then one Flink job can handle all events from these data sources by maintaining one global state. This design seems more scalable than solution 1.

Which one is recommended? Is there a better way that is missed? Appreciate very much for any hints!

Reply | Threaded
Open this post in threaded view
|

Re: multiple streams joining

JING ZHANG
Hi Lian Jiang,
  Both two solutions have its advantage and disadvantages.

  Solution 1:
  advantage:
      1. Code is simple. The multiple join operator decides that join
sequence, which means the only (A,B) could output the streamA_B_C and
only (A,B,C) could output the streamA_B_C_D. BTW, this is also very
important, for example, if in the future, more sources need to be
involved in the multiple join.
      2. Does not require those inputs have same schema
  disadvantage:
      1. Costs more state. For an output (ID,A,B,C), the (ID,A) would
be saved in the state of streamA_B streamA_B_C and Stream
streamA_B_C_D. BTW, This shortcoming can be alleviated in the
following cases: only a small part of  each input can be joined with
the other side of the input, you could put join with lower selectivity
in the front of multiple join.

  Solution 2:
  advantage:
      1. Costs less state. All data is maintained in one operator.

  disadvantage:
      1. Code is complex. Because all sources data would be coming
into one operator, the sequence which source data comes first is
uncertain. And you need to take care of multiple join logical by
yourself.
      2. Requires those inputs have same schema

You could make a decision based on your requirements (state storage
cost, project development cost, project maintenance cost) and data
characteristics (how much data could be joined success with the other
side).
BTW, things would be more complex for outer join.

Best,
JING ZHANG

Lian Jiang <[hidden email]> 于2021年5月27日周四 上午11:20写道:

>
> Hi,
>
> Imagine I have one class having 4 fields: ID, A, B, C.  There are three data sources providing data in the form of (ID, A), (ID, B), (ID, C) respectively. I want to join these three data sources to get final (ID, A, B, C) without any window. For example, (ID, A) could come one month after (ID, B). Such joining needs global states. There are two designs in my mind.
>
> 1. Stream connect with separated kafka topic
> streamA_B = DataSourceA connect DataSourceB
> streamA_B_C = streamA_B connect DataSourceC
>
> Each data source is ingested via a dedicated kafka topic. This design seems not scalable because I need N stream connect operations for N+1 data sources. Each stream connect needs to maintain a global state. For example, streamA_B needs a global state for maintaining (ID, A, B) and streamA_B_C needs another for maintaining (ID, A, B, C).
>
> 2. Shared kafka topic
> All data sources are ingested via a shared kafka topic (using union event type or schema reference). Then one Flink job can handle all events from these data sources by maintaining one global state. This design seems more scalable than solution 1.
>
> Which one is recommended? Is there a better way that is missed? Appreciate very much for any hints!
>