How to distribute subtasks evenly across taskmanagers?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to distribute subtasks evenly across taskmanagers?

sunny yun
Why does Flink do resource management by only slots, not by TaskManagers and slots? 

If there are one Flink cluster to submit multiple jobs, how do I make JobManager to distribute subtasks evenly to all TaskManagers?
Now, JobManager treats the slots globally, some jobs' operators are assigned only one TM's slots.


For example:

3 TaskManager (taskmanager.numberOfTaskSlots: 8) = total 24 slots

env
.setParallelism(6)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job1);

env
.setParallelism(12)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job2);

env
.setParallelism(6)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job3);


Intented : 
                TM1 TM2 TM3
               --------------
job1-source     2   2   2
job1-map-sink   2   2   2
job2-source     4   4   4
job2-map-sink   4   4   4
job3-source     2   2   2
job3-map-sink   2   2   2


Because each job is under the stress at unpredictable time, it is important to use all available resource per each job.
We made three clusters (6, 6, 12 each total slots) as a temporary, but it's not pretty way.


Best, Sunny
Reply | Threaded
Open this post in threaded view
|

Re: How to distribute subtasks evenly across taskmanagers?

Till Rohrmann
Hi Sunny,

this is a current limitation of Flink's scheduling. We are currently working on extending Flinks scheduling mechanism [1] which should also help with solving this problem. At the moment, I recommend using the per-job mode so that you have a single cluster per job.


Cheers,
Till

On Wed, Dec 5, 2018 at 2:07 AM Sunny Yun <[hidden email]> wrote:
Why does Flink do resource management by only slots, not by TaskManagers and slots? 

If there are one Flink cluster to submit multiple jobs, how do I make JobManager to distribute subtasks evenly to all TaskManagers?
Now, JobManager treats the slots globally, some jobs' operators are assigned only one TM's slots.


For example:

3 TaskManager (taskmanager.numberOfTaskSlots: 8) = total 24 slots

env
.setParallelism(6)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job1);

env
.setParallelism(12)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job2);

env
.setParallelism(6)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job3);


Intented : 
                TM1 TM2 TM3
               --------------
job1-source     2   2   2
job1-map-sink   2   2   2
job2-source     4   4   4
job2-map-sink   4   4   4
job3-source     2   2   2
job3-map-sink   2   2   2


Because each job is under the stress at unpredictable time, it is important to use all available resource per each job.
We made three clusters (6, 6, 12 each total slots) as a temporary, but it's not pretty way.


Best, Sunny