Hi, We started a Beam application with Flink runner with parallelism as 50. It is a
stateful application which uses RocksDB as state store. We are using
timers and Beam’s value state and bag state (which is same as List state of Flink). We are doing
incremental checkpointing. With initial parallelism of 50, our application is able to process up to
50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of
18. We are seeing that our application is only able to process 7000 records per second. Records processed per task manager was almost
half of what is used to process previously with 50 task managers. We didn’t give any maxParallelism in our Beam application but found from logs that maxParallelism has been set to -1. Also Beam’s doc for Flink runner mentiones by default maxParallelism is -1
https://beam.apache.org/documentation/runners/flink/ But this Flink doc https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html mentions
that by default maxParallelism is set to operatorParallelism + (operatorParallelism / 2) which would be 75 in our case.
I didn’t get how maxParallelism is set (when giving maxParallelism as -1 to Beam’s Flink runner). I highly doubt
more key groups is causing this performance degradation? Beam version - 2.19 Flink version- 1.9 Any suggestions/help would be appreciated. Thanks Sandeep Kathula |
Hi, maxParallelism = -1, the default value, is interpreted as described in the documentation you linked: > The default setting for the maximum parallelism is roughly operatorParallelism + (operatorParallelism / 2) with a lower bound of 128 and an upper bound of 32768. So maxParallelism should be 128 in your case, if you haven't changed this value. But aren't you confusing maxParallelism with parallelism? It doesn't seem to have anything to do with the problem you described: > With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second. Doesn't this answer your question? Initially you were running with parallelism 50 and you achieved 50k r/s. After decreasing the parallelism and scaling down the cluster the throughput went down to 7k r/s. It makes sense to me. Piotrek czw., 16 lip 2020 o 22:08 Kathula, Sandeep <[hidden email]> napisał(a):
|
Free forum by Nabble | Edit this page |