Hello Everyone,
Can someone help me with a solution? I have a flink job(2 task-managers) with a job parallelism of 64 and task slot of 64. I have a parallelism set for one of the operators as 16. This operator(16 parallelism) slots are not getting evenly distributed across two task managers. It often takes higher task slots like 10/11 in one task manager and 5/6 in other task manager. I'am using flink version 1.11.2. I tried adding cluster.evenly-spread-out-slots: true but it didn't work. Any solution is greatly appreciated. Thanks in advance, Regards, Vignesh |
Hi Vignesh, are you trying to achieve an even distribution of tasks for this one operator that has the parallelism set to 16? Or do you observe the described behavior also on a job level? I'm adding Chesnay to the thread as he might have more insights on this topic. Best, Matthias On Mon, Mar 22, 2021 at 6:31 PM Vignesh Ramesh <[hidden email]> wrote:
|
There was a similar discussion recently in this mailing list about distributing the work onto different TaskManagers. One finding Xintong shared there [1] was that the parameter cluster.evenly-spread-out-slots is used to evenly allocate slots among TaskManagers but not how the tasks are actually distributed among the allocated slots. It would be interesting to know more about your job. If the upstream operator does some shuffling, you might run into the issue of the task executions not being distributed evenly anymore. On Tue, Mar 23, 2021 at 1:42 PM Matthias Pohl <[hidden email]> wrote:
|
Hi Matthias, Thanks for your reply. In my case, yes the upstream operator for the operator which is not distributed evenly among task managers is a flink Kafka connector with a rebalance(shuffling). Regards, Vignesh On Tue, 23 Mar, 2021, 6:48 pm Matthias Pohl, <[hidden email]> wrote:
|
I think currently flink doesn't support your case, and another idea is that you can set the parallelism of all operators to 64, then it will be evenly distributed to the two taskmanagers. Vignesh Ramesh <[hidden email]> 于2021年3月25日周四 上午1:05写道:
|
Hi Vignesh, if I understand you correctly, then you have a job like: KafkaSources(parallelism = 64) => Mapper(parallelism = 16) => something else Moreover, you probably have slot sharing enabled which means that a KafkaSource and a Mapper can be deployed into the same slot. So what happens before scheduling is that Flink calculates which Mapper can run together with which KafkaSource. Since the KafkaSource shuffles the data any Mapper could run with any KafkaSource. So this means that Flink pairs Mapper_1 with KafkaSource_1, Mapper_2 with KafkaSource_2, ..., Mapper_16 with KafkaSource_16. Then Flink tries to allocate the required slots for the topology. When selecting the slots for the KafkaSources, it tries to evenly spread them out. Since the parallelism of the sources is equal to the total number of slots in the cluster, this is not really visible. Since the order in which the KafkaSources are assigned their slots is not defined, the Mapper_i end up on arbitrary TaskManagers. That's what you are observing. I think one could improve the situation if Flink tried to allocate slots starting with subtask index 1, 2, 3, ..., etc. If you set the parallelism of the KafkaSources to 16, then you should see an evenly spread allocation of slots. Cheers, Till On Tue, Mar 30, 2021 at 10:05 AM yidan zhao <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |