Union of more then two streams

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Union of more then two streams

B.B.
Hi,

I have an architecture question regarding the union of more than two streams in Apache Flink.

We are having three and sometime more streams that are some kind of code book with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and store the enrichment data as managed, keyed state (so when compact events from kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with main stream, eg. mainstream.conect(codebook_1) -> mainstreamWihtCodebook1.connect(codebook_2) - > mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

BB.
Reply | Threaded
Open this post in threaded view
|

Re: Union of more then two streams

Yun Gao
Hi, 

With a.connect(b).coprocess(xx).connect(c).coprocess(xx), there would create two
operators, the first operators would union a and b and output the enriched data, 
and then .connect(c).coprocess(xx) would pass-throught the already enriched data
and enrich the record from c. Since the two operators could not get chained, the performance
seems would be affected.

Another method is to first label each input with a tag, e.g., ("a", a record), ("b", b record), ..
and then use 

a.union(b).union(c).union(d).process(xx)

then in the process operator, different logic could be chosen according to the tag. 

If adding tag is hard, then it might need to use the new multiple-inputs operator, which somehow would need
to use the low-level API of Flink, thus I would recommend the above tag + union method first.

Best, 
Yun
------------------Original Mail ------------------
Sender:B.B. <[hidden email]>
Send Date:Fri Apr 2 16:41:16 2021
Recipients:flink_user <[hidden email]>
Subject:Union of more then two streams
Hi,

I have an architecture question regarding the union of more than two streams in Apache Flink.

We are having three and sometime more streams that are some kind of code book with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and store the enrichment data as managed, keyed state (so when compact events from kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with main stream, eg. mainstream.conect(codebook_1) -> mainstreamWihtCodebook1.connect(codebook_2) - > mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

BB.
Reply | Threaded
Open this post in threaded view
|

Re: [External] : Union of more then two streams

Fuyao Li-2
In reply to this post by B.B.

Hello BB,

 

Just want to share you some of my immature ideas. Maybe some experts can give you better solutions and advice.

  1. DataStream based solution:
    1. To do a union, as you already know, you must have the datastream to be of the same format. Otherwise, you can’t do it. There is a work around way to solve you problem. You can ingest the datastream with deserializationSchema and map different code book streams to the same Java type, there is a field of foreign key value (codebook_fk1, cookbook_fk2 values will all stored here), another field just contains the name of the foreign value (e.g. cookbook_fk1.) All other fields should also be generalized into such Java Type. After that, you can do a union for these different code book  streams and join with mainstream.
    2. For cascade connect streams, I guess it is not a suggested approach, in additional to memory, I think it will also make the watermark hard to coordinate.
  2. Flink SQL approach:

You can try to use Flink temporal table join to do the join work here. [1][2]. For such approach, you are cascade the join to enrich the mainstream. This seems to be fitting into your use case since your enrich stream doesn’t change so often and contains something like currency. For such join, there should be some internal optimization and might get rid of some memory consumption issues, I guess? Maybe I am wrong. But it worth to take a look.

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#event-time-temporal-join

 

Best,

Fuyao

 

 

From: B.B. <[hidden email]>
Date: Friday, April 2, 2021 at 01:41
To: [hidden email] <[hidden email]>
Subject: [External] : Union of more then two streams

Hi,

I have an architecture question regarding the union of more than two streams in Apache Flink.

We are having three and sometime more streams that are some kind of code book with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and store the enrichment data as managed, keyed state (so when compact events from kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with main stream, eg. mainstream.conect(codebook_1) -> mainstreamWihtCodebook1.connect(codebook_2) - > mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

 

BB.

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Union of more then two streams

Fuyao Li-2

Hello BB,

 

  1. For the datastream approach, you can use broadcast pattern to build state to enrich your data instead of join.
    1. You can define something like this,

Class CodebookData{

 private Currency currency;

 private OrganizationUnit organizationUnit;

 ...

}

 

    1. you can leverage Broadcast stream[1] as you mentioned your code book streams doesn’t have much data. This is a good use case for broadcast pattern. Connect the wrapper class datastream with the main stream and simply enrich it with the state you built. Not sure if this fits into your use case…. Please check.
  1. I am not sure, lateral table join (temporal join) is designed to handle some data enrich work load. You have a main table, and probe side table… I suppose there is some kind of optimization, maybe I am wrong... In theory, it is still based on join, maybe you forget about this part. Anyway, Flink SQL will make join easier.

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

 

Best,

Fuyao

 

From: B.B. <[hidden email]>
Date: Monday, April 5, 2021 at 06:27
To: Fuyao Li <[hidden email]>
Subject: Re: [External] : Union of more then two streams

Hi Fuyao,

thanks for you input.

I have follow up questions regarding your advices.

 

In your DataStream suggested solution in a) case could you elaborate a little bit more. When you create that kind of generalized type how would you join it with main stream? Which key would you use. 

I was thinking of creating wrapper class that inside will have all the data from code books. For example 

Class CodebookData{

 private Currency currency;

 private OrganizationUnit organizationUnit

 ...

}

But then I have problem which key to use to join with main stream because currency has its own key currencyId and organization unit has also its key organizationId and so on.

 

Regarding your 2. suggested solution with Flink SQL what do you mean by

 For such join, there should be some internal optimization and might get rid of some memory consumption issues”.

 

Thx in advance

 

BB

 

 

On Mon, 5 Apr 2021 at 07:29, Fuyao Li <[hidden email]> wrote:

Hello BB,

 

Just want to share you some of my immature ideas. Maybe some experts can give you better solutions and advice.

  1. DataStream based solution:
    1. To do a union, as you already know, you must have the datastream to be of the same format. Otherwise, you can’t do it. There is a work around way to solve you problem. You can ingest the datastream with deserializationSchema and map different code book streams to the same Java type, there is a field of foreign key value (codebook_fk1, cookbook_fk2 values will all stored here), another field just contains the name of the foreign value (e.g. cookbook_fk1.) All other fields should also be generalized into such Java Type. After that, you can do a union for these different code book  streams and join with mainstream.
    2. For cascade connect streams, I guess it is not a suggested approach, in additional to memory, I think it will also make the watermark hard to coordinate.
  1. Flink SQL approach:

You can try to use Flink temporal table join to do the join work here. [1][2]. For such approach, you are cascade the join to enrich the mainstream. This seems to be fitting into your use case since your enrich stream doesn’t change so often and contains something like currency. For such join, there should be some internal optimization and might get rid of some memory consumption issues, I guess? Maybe I am wrong. But it worth to take a look.

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#event-time-temporal-join

 

Best,

Fuyao

 

 

From: B.B. <[hidden email]>
Date: Friday, April 2, 2021 at 01:41
To:
[hidden email] <[hidden email]>
Subject: [External] : Union of more then two streams

Hi,

I have an architecture question regarding the union of more than two streams in Apache Flink.

We are having three and sometime more streams that are some kind of code book with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and store the enrichment data as managed, keyed state (so when compact events from kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with main stream, eg. mainstream.conect(codebook_1) -> mainstreamWihtCodebook1.connect(codebook_2) - > mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

 

BB.

--

Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane