Will broadcast stream affect performance because of the absence of operator chaining?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Will broadcast stream affect performance because of the absence of operator chaining?

黄兆鹏
Hi all, 
My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.

For example, the two streams just like this:
OperatorA  ->  OperatorB  -> OperatorC
      ^                   ^                      ^
      |                    |                       |
               BroadcastStream

If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.

And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance? 

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

Piotr Nowojski-3
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

黄兆鹏
Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
                          ^                   ^                      ^
                          |                    |                        |
                                  BroadcastStream

the second approach is that I have an operator that will join my data and schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
                          ^                 
                          |                   
                BroadcastStream            


The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance.

The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining? 

Thanks!

 
 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

Piotr Nowojski-3
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two inputs, it can be chained to something else (only one input operators are chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record AND Schema), and upstream operators (operatorA) could just ignore/forward the Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of Flink, but then you might have issues with checkpointing/watermarking and it just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏 <[hidden email]> wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
                          ^                   ^                      ^
                          |                    |                        |
                                  BroadcastStream

the second approach is that I have an operator that will join my data and schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
                          ^                 
                          |                   
                BroadcastStream            


The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance.

The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining? 

Thanks!

 
 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

黄兆鹏
Hi, Piotrek,
I previously considered your first advice(use union record type), but I found that the schema would be only sent to one subtask of the operator(for example, operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 

Thank you!





 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two inputs, it can be chained to something else (only one input operators are chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record AND Schema), and upstream operators (operatorA) could just ignore/forward the Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of Flink, but then you might have issues with checkpointing/watermarking and it just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏 <[hidden email]> wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
                          ^                   ^                      ^
                          |                    |                        |
                                  BroadcastStream

the second approach is that I have an operator that will join my data and schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
                          ^                 
                          |                   
                BroadcastStream           


The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance.

The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining?

Thanks!

 
 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

Piotr Nowojski-3
Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

On 6 Aug 2019, at 13:11, 黄兆鹏 <[hidden email]> wrote:

Hi, Piotrek,
I previously considered your first advice(use union record type), but I found that the schema would be only sent to one subtask of the operator(for example, operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 

Thank you!





 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two inputs, it can be chained to something else (only one input operators are chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record AND Schema), and upstream operators (operatorA) could just ignore/forward the Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of Flink, but then you might have issues with checkpointing/watermarking and it just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏 <[hidden email]> wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
                          ^                   ^                      ^
                          |                    |                        |
                                  BroadcastStream

the second approach is that I have an operator that will join my data and schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
                          ^                 
                          |                   
                BroadcastStream           


The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance.

The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining?

Thanks!

 
 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

Victor Wong

Hi,

If the performance impact of braking the operator chain is huge, maybe you can read the latest schema from Kafka within the operators.

 

It’s a little complicated, you have to start a Kafka consumer in e.g. ` RichFunction#open()` and reading from (the largest offset – 1), and handle new messages coming in.

The good news

 

From: Piotr Nowojski <[hidden email]>
Date: Tuesday, August 6, 2019 at 8:55 PM
To:
黄兆鹏 <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Will broadcast stream affect performance because of the absence of operator chaining?

 

Hi,

 

No, I think you are right, I forgot about the broadcasting requirement.

 

Piotrek



On 6 Aug 2019, at 13:11, 黄兆鹏 <[hidden email]> wrote:

 

Hi, Piotrek,

I previously considered your first advice(use union record type), but I found that the schema would be only sent to one subtask of the operator(for example, operatorA), and other subtasks of the operator are not aware of it. 

In this case is there anything I have missed? 

 

Thank you!




 

 

------------------ Original ------------------

Date:  Tue, Aug 6, 2019 06:57 PM

To:  "黄兆鹏"<[hidden email]>;

Cc:  "user"<[hidden email]>;

Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?

 

Hi,

 

Have you measured the performance impact of braking the operator chain?

 

This is a current limitation of Flink chaining, that if an operator has two inputs, it can be chained to something else (only one input operators are chained together). There are plans for the future to address this issue.

 

As a workaround, besides what you have mentioned:

- maybe your record type can be a union: type of Record or Schema (not Record AND Schema), and upstream operators (operatorA) could just ignore/forward the Schema. You wouldn’t need to send schema with every record.

- another (ugly) solution, is to implement BroadcastStream input outside of Flink, but then you might have issues with checkpointing/watermarking and it just makes many things more complicated.

 

Piotrek



On 6 Aug 2019, at 10:50, 黄兆鹏 <[hidden email]> wrote:

 

Hi Piotrek,

Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight.

 

In fact there are two ways to solve my problem,

 

the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally.

DataStream: OperatorA  ->  OperatorB  -> OperatorC

                          ^                   ^                      ^

                          |                    |                        |

                                  BroadcastStream

 

the second approach is that I have an operator that will join my data and schema together and send to the downstream operators:

 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC

                          ^                 

                          |                   

                BroadcastStream           

 

 

The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator.

But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance.

 

The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators.

 

Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining?

 

Thanks!

 

 

 

------------------ Original ------------------

Date:  Tue, Aug 6, 2019 04:23 PM

To:  "黄兆鹏"<[hidden email]>;

Cc:  "user"<[hidden email]>;

Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?

 

Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

Victor Wong

Oops, accidentally sent the email.

The good news is that you don’t have to checkpoint the state of the Kafka consumers.

 

From: Wong Victor <[hidden email]>
Date: Tuesday, August 6, 2019 at 11:31 PM
To: Piotr Nowojski <[hidden email]>,
黄兆鹏 <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Will broadcast stream affect performance because of the absence of operator chaining?

 

Hi,

If the performance impact of braking the operator chain is huge, maybe you can read the latest schema from Kafka within the operators.

 

It’s a little complicated, you have to start a Kafka consumer in e.g. ` RichFunction#open()` and reading from (the largest offset – 1), and handle new messages coming in.

The good news

 

From: Piotr Nowojski <[hidden email]>
Date: Tuesday, August 6, 2019 at 8:55 PM
To:
黄兆鹏 <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Will broadcast stream affect performance because of the absence of operator chaining?

 

Hi,

 

No, I think you are right, I forgot about the broadcasting requirement.

 

Piotrek




On 6 Aug 2019, at 13:11, 黄兆鹏 <[hidden email]> wrote:

 

Hi, Piotrek,

I previously considered your first advice(use union record type), but I found that the schema would be only sent to one subtask of the operator(for example, operatorA), and other subtasks of the operator are not aware of it. 

In this case is there anything I have missed? 

 

Thank you!





 

 

------------------ Original ------------------

Date:  Tue, Aug 6, 2019 06:57 PM

To:  "黄兆鹏"<[hidden email]>;

Cc:  "user"<[hidden email]>;

Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?

 

Hi,

 

Have you measured the performance impact of braking the operator chain?

 

This is a current limitation of Flink chaining, that if an operator has two inputs, it can be chained to something else (only one input operators are chained together). There are plans for the future to address this issue.

 

As a workaround, besides what you have mentioned:

- maybe your record type can be a union: type of Record or Schema (not Record AND Schema), and upstream operators (operatorA) could just ignore/forward the Schema. You wouldn’t need to send schema with every record.

- another (ugly) solution, is to implement BroadcastStream input outside of Flink, but then you might have issues with checkpointing/watermarking and it just makes many things more complicated.

 

Piotrek




On 6 Aug 2019, at 10:50, 黄兆鹏 <[hidden email]> wrote:

 

Hi Piotrek,

Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight.

 

In fact there are two ways to solve my problem,

 

the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally.

DataStream: OperatorA  ->  OperatorB  -> OperatorC

                          ^                   ^                      ^

                          |                    |                        |

                                  BroadcastStream

 

the second approach is that I have an operator that will join my data and schema together and send to the downstream operators:

 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC

                          ^                 

                          |                   

                BroadcastStream           

 

 

The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator.

But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance.

 

The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators.

 

Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining?

 

Thanks!

 

 

 

------------------ Original ------------------

Date:  Tue, Aug 6, 2019 04:23 PM

To:  "黄兆鹏"<[hidden email]>;

Cc:  "user"<[hidden email]>;

Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?

 

Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Will broadcast stream affect performance because of the absence of operator chaining?

Zhijiang(wangzhijiang999)
In reply to this post by Piotr Nowojski-3
Hi paul,

In theory broadcast operator could not be chained for all-to-all mode, and chain is only feasible for one-to-one mode like forward.
If chain, the next operator could process the raw record emitted by head operator directly. But if not, the emitted record must be serialized into buffer which could be consumed by the dowstream op via network ornot. So the chain way has the best performance in theory compared to non-chain.

In your case, if you could not bypass the requirements of broadcast, then you have to face the non-chain way and test whether the real performance is within your acception or not. If the performance is not reaching your requirements, we could further consider other improvements.

Best,
Zhijiang
------------------------------------------------------------------
From:Piotr Nowojski <[hidden email]>
Send Time:2019年8月6日(星期二) 14:55
To:黄兆鹏 <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: Will broadcast stream affect performance because of the absence of operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

On 6 Aug 2019, at 13:11, 黄兆鹏 <[hidden email]> wrote:

Hi, Piotrek,
I previously considered your first advice(use union record type), but I found that the schema would be only sent to one subtask of the operator(for example, operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 

Thank you!





 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two inputs, it can be chained to something else (only one input operators are chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record AND Schema), and upstream operators (operatorA) could just ignore/forward the Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of Flink, but then you might have issues with checkpointing/watermarking and it just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏 <[hidden email]> wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, and broadcast to every operator that will handle the data, just as I posted originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
                          ^                   ^                      ^
                          |                    |                        |
                                  BroadcastStream

the second approach is that I have an operator that will join my data and schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
                          ^                 
                          |                   
                BroadcastStream           


The benefits of the first approach is that the flink job does not have to transfer the schema with the real data records among operators, because the schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator chain, so operators may not be executed in the same slot and gain worse performance.

The second approach does not have the problem as the first one, but each message will carry its schema info among operators, it will cost about 2x for serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema change and at the same time not breaking the operator chaining?

Thanks!

 
 
------------------ Original ------------------
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"<[hidden email]>;
Cc:  "user"<[hidden email]>;
Subject:  Re: Will broadcast stream affect performance because of the absence of operator chaining?
 
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka source will be still a performance bottleneck in your job. Also Network exchanges add some measurable overhead only if your records are very lightweight and easy to process (for example if you are using RocksDB then you can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some significant number of messages, run both jobs, compare the throughput and decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 <[hidden email]> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka topic and try to broadcast to every operator so that each operator could know what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are chained together in one slot because they have the same parallelism so that it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the performance? Or flink will still chain them together to gain maximum performance?
>
> Thanks!