Hi Flink Community,
I'm currently running a heavy flink job on Flink 1.9.3 that has a lot of subtasks and observing some subtask distribution issues. The job in question has 9288 sub tasks and they are running on a large set of TMs (total available slots are 1792). I'm using the cluster.evenly-spread-out-slots configuration option to have the slots be allocated uniformly but I am still seeing non-uniform subtask distribution that seems to be affecting performance. It looks like some TMs are being overloaded and seeing a much greater than uniform allocation of subtasks. I've been trying to reproduce this situation at a smaller scale but have not been successful in doing so. As part of debugging the scheduling process when trying to reproduce this at a smaller scale I observed that the non-location preference selectWithoutLocationPreference override introduced by the evenly spread out strategy option is not being invoked at all as the execution vertices still have a location preference to be assigned the same slots as their input vertices. This happens at job startup time and not during recovery, so I'm not sure if recovery is where the non preference code path is invoked. In essence the only impact of using the evenly spread out strategy seems to be a slightly different candidate score calculation. I wanted to know:- 1. Is the evenly spread out strategy the right option to choose for achieving the uniform distribution of subtasks? 2. Is the observed scheduling behaviour expected for the evenly spread out strategy? When do we expect the non location preference code path to be invoked? For us this only happens on sources since they have no incoming edges. Apart from this I am still trying to understand the nature of scheduling in Flink and how that could bring about this situation, I was wondering if there were known issues or peculiarities of the Flink job scheduler that could lead to this situation occurring. For example I'm looking at the known issues mentioned in the ticket https://issues.apache.org/jira/browse/FLINK-11815 . I was hoping to understand :- 1. The conditions that would give rise to these kinds of situations or how to verify if we are running into them. For example, how to verify that key group allocation is non-uniform 2. If these issues have been addressed in subsequent versions of flink 3. If there is any other information about the nature of scheduling jobs in flink that could give rise to the non-uniform distribution observed. Please let me know if further information needs to be provided. Thanks, Harshit |
Hi Harshit, the cluster.evenly-spread-out-slots strategy works the following way. If you schedule a task w/o preferred inputs (e.g. no inputs or too many inputs (I think the threshold is 8 inputs)), then it will pick a slot from a TaskManager with the lowest utilization. If, however, the task has an input preference, then the input preference has precedence over the utilization. In fact, the utilization is only a tie breaker in case that there are multiple slots being local to the inputs or on the same hosts. Moreover, the spread out strategy only works for the currently available set of TaskManagers. If you are running on Yarn, Mesos or K8s where Flink can dynamically allocate new TMs, then the system will first fill up the available TMs before it allocates new ones. The cluster.evenly-spread-out-slots strategy won't give you a guaranteed spread out of slots but acts more as a heuristic. If you want to achieve that all your tasks are evenly spread out across the cluster, then I would suggest to set the parallelism of your sources to a multiple of the number of TMs. That way the sources will be spread out and thus also the consumers of these sources. For your other questions: 1. The key group allocation will be non-uniform iff max-parallelism % parallelism != 0. If this is the case, then you will have tasks which have one additional key group than some of the other tasks. Depending on the actual values and the number of slots s on a TaskManager, it can happen that you process s more keygroups on TM1 compared to TM2. 2. No, the behaviour is still the same. 3. In order to answer this question you would have to give us a bit more information about the actual distribution of tasks (which tasks are running where). Cheers, Till On Fri, Nov 20, 2020 at 6:46 PM Harshit Hajela <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |