Fwd: Upgrade to Beam 2.28 and Flink 1.12.2 seeing significant performance degradation

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Upgrade to Beam 2.28 and Flink 1.12.2 seeing significant performance degradation

Eleanore Jin
Hi community,

I recently upgraded Beam 2.23/Flink 1.10.2 to Beam 2.28/Flink 1.12.2 without any code change for my beam pipeline, and I see a significant performance degradation

The pipeline: read from kafka topic 1 using KafkaIO from Beam-> using a side input to filter the message (translates to broadcast stream in flink)-> some json patch to transfer the message -> output to another kafka topic using KafkaIO from Beam

What I have observed
1. with checkpoint enabled, with my original config of checkpoint timeout 30s, it never succeeded in checkpoint after upgrade, due to checkpoint timeout
2. I see the DAG shown in flink has been changed a lot, more rebalance and hashing is introduced
image (68).png - this is Beam 2.23 / Flink 1.10.2
image (69).png - this is Beam 2.28 / Flink 1.12.2 
3. Even without enabling checkpoint, the throughput dropped from 80K rps (Beam 2.23/Flink 1.) to 5K rps

Very much appreciated for any suggestions

Thanks a lot for your help!
Eleanore

Reply | Threaded
Open this post in threaded view
|

Re: Upgrade to Beam 2.28 and Flink 1.12.2 seeing significant performance degradation

Arvid Heise-4
Hi Eleanore,

I'm assuming that the degradation happens because of the more complex DAG. I'm also assuming it's causing more latency and prolongs the checkpointing times.

I recommend you to ask this question on the Beam ML - they can probably explain why it's so complicated and how to tweak it to get to the prior shape.

On Tue, Apr 20, 2021 at 8:17 AM Eleanore Jin <[hidden email]> wrote:
Hi community,

I recently upgraded Beam 2.23/Flink 1.10.2 to Beam 2.28/Flink 1.12.2 without any code change for my beam pipeline, and I see a significant performance degradation

The pipeline: read from kafka topic 1 using KafkaIO from Beam-> using a side input to filter the message (translates to broadcast stream in flink)-> some json patch to transfer the message -> output to another kafka topic using KafkaIO from Beam

What I have observed
1. with checkpoint enabled, with my original config of checkpoint timeout 30s, it never succeeded in checkpoint after upgrade, due to checkpoint timeout
2. I see the DAG shown in flink has been changed a lot, more rebalance and hashing is introduced
image (68).png - this is Beam 2.23 / Flink 1.10.2
image (69).png - this is Beam 2.28 / Flink 1.12.2 
3. Even without enabling checkpoint, the throughput dropped from 80K rps (Beam 2.23/Flink 1.) to 5K rps

Very much appreciated for any suggestions

Thanks a lot for your help!
Eleanore

Reply | Threaded
Open this post in threaded view
|

Re: Upgrade to Beam 2.28 and Flink 1.12.2 seeing significant performance degradation

Eleanore Jin
Hi Arvid, 

Yes, I reached out to Beam Community, thanks for the suggestion.

Eleanore

On Wed, Apr 21, 2021 at 10:22 AM Arvid Heise <[hidden email]> wrote:
Hi Eleanore,

I'm assuming that the degradation happens because of the more complex DAG. I'm also assuming it's causing more latency and prolongs the checkpointing times.

I recommend you to ask this question on the Beam ML - they can probably explain why it's so complicated and how to tweak it to get to the prior shape.

On Tue, Apr 20, 2021 at 8:17 AM Eleanore Jin <[hidden email]> wrote:
Hi community,

I recently upgraded Beam 2.23/Flink 1.10.2 to Beam 2.28/Flink 1.12.2 without any code change for my beam pipeline, and I see a significant performance degradation

The pipeline: read from kafka topic 1 using KafkaIO from Beam-> using a side input to filter the message (translates to broadcast stream in flink)-> some json patch to transfer the message -> output to another kafka topic using KafkaIO from Beam

What I have observed
1. with checkpoint enabled, with my original config of checkpoint timeout 30s, it never succeeded in checkpoint after upgrade, due to checkpoint timeout
2. I see the DAG shown in flink has been changed a lot, more rebalance and hashing is introduced
image (68).png - this is Beam 2.23 / Flink 1.10.2
image (69).png - this is Beam 2.28 / Flink 1.12.2 
3. Even without enabling checkpoint, the throughput dropped from 80K rps (Beam 2.23/Flink 1.) to 5K rps

Very much appreciated for any suggestions

Thanks a lot for your help!
Eleanore