Hi all,
I must say I'm very impressed by Flink and what it can do. I was trying to play around with Flink operator parallelism and scalability and I have few questions regarding this subject. My setup is: 1. Flink 1.9.1 2. Docker Job Cluster, where each Task manager has only one task slot. I'm following [1] 3. env setup: env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)); env.setParallelism(1); env.setMaxParallelism(128); env.enableCheckpointing(10 * 60 * 1000); Please mind that I am using operator chaining here. My pipeline setup: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png> As you can see I have 7 operators (few of them were actually chained and this is ok), with different parallelism level. This all gives me 23 tasks total. I've noticed that with "one task manager = one task slot" approach I have to have 6 task slots/task managers to be able to start this pipeline. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png> If number of task slots is lower than 6, job is scheduled but not started. With 6 task slots everything is working fine and I've must say that I'm very impressed with a way that Flinks balanced data between task slots. Data was distributed very evenly between operator instances/tasks. In this setup (7 operators, 23 tasks and 6 task slots), some task slots have to be reused by more than one operator. While inspecting UI I've found examples such operators. This is what I was expecting though. However I was surprised a little bit after I added one additional task manager (hence one new task slot) <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png> After adding new resources, Flink did not re balanced/redistributed the graph. So this host was sitting there and doing nothing. Even after putting some load on the cluster, still this node was not used. *After doing this exercise I have few questions:* 1. It seems that number of task slots must be equal or greater than max number of parallelism used in the pipeline. In my case it was 6. When I changed parallelism for one of the operator to 7, I had to have 7 task slots (task managers in my setup) to be able to even start the job. Is this the case? 2. What I can do to use the extra node that was spanned while job was running? In other words, If I would see that one of my nodes has to much load what I can do? Please mind that I'm using keyBy/hashing function in my pipeline and in my tests I had around 5000 unique keys. I've try to use REST API to call "rescale" but I got this response: /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/ Thanks. [1] https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, Each task needs a slot to run. However, Flink enables slot sharing[1] by default so that one slot can host one parallel instance of each task in a job. That's why your job can start with 6 slots. However, different parallel instances of the same task cannot share a slot. That's why you need at least 6 slots to run your job. You can set tasks to be in different slot sharing group via '.slotSharingGroup(xxx)' to force certain tasks to not share slots. This allows the tasks to not burden each other. However, in this way the job will need more slots to start. So for your questions: #1 yes #2 ATM, you will need to resubmit your job with the adjusted parallelism. The rescale cli was experimental and was temporarily removed [2] Thanks, Zhu Zhu KristoffSC <[hidden email]> 于2020年1月9日周四 上午1:05写道: Hi all, |
Hi KristoffSC, As Zhu Zhu explained, Flink does not currently auto-scale a Job as new resources become available. Instead the Job must be stopped via a savepoint and restarted with a new parallelism (the old rescale CLI experiment use to perform this). Making Flink reactive to new resources and auto scaling jobs is something I'm currently very interested in. An approach on how to change Flink to support this has been previously outlined/discussed in FLINK-10407 (https://issues.apache.org/jira/browse/FLINK-10407) /David/ On Thu, Jan 9, 2020 at 7:38 AM Zhu Zhu <[hidden email]> wrote:
|
Thank you David and Zhu Zhu,
this helps a lot. I have follow up questions though. Having this /"Instead the Job must be stopped via a savepoint and restarted with a new parallelism"/ and slot sharing [1] feature, I got the impression that if I would start my cluster with more than 6 task slots, Flink will try deploy tasks across all resources, trying to use all available resources during job submission I did a two tests having my original task. 1. I started a Job Cluster with 7 task slots (7 task manager since in this case 1 task manager has one task slot). 2. I started a Session cluster with 28 task slots in total. In this case I had 7 task managers, 4 task slot each. For case 1, I use "FLINK_JOB" variable as stated in [2]. For case 2, I submitted my job from UI after Flink started to be operative. For both cases it used only 6 task slots, so it was still reusing task slots. I got the impression that it will try to use as much available resources as it can. What do you think about this? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources [2] https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, Did you increase the parallelism of the vertex that has the largest parallelism? Or did you explicitly set tasks to be in different slot sharing group? With the default slot sharing, the number of slots required/used equals to the max parallelism of a JobVertex, which is 6 in your case.KristoffSC <[hidden email]> 于2020年1月9日周四 下午9:26写道: Thank you David and Zhu Zhu, |
Hi Zhu Zhu,
well In my last test I did not change the job config, so I did not change the parallelism level of any operator and I did not change policy regarding slot sharing (it stays as default one). Operator Chaining is set to true without any extra actions like "start new chain, disable chain etc" What I assume however is that Flink will try find most efficient way to use available resources during job submission. In the first case, where I had only 6 task managers (which matches max parallelism of my JobVertex), Flink reused some TaskSlots. Adding extra task slots did was not effective because reason described by David. This is understandable. However, I was assuming that if I submit my job on a cluster that have more task managers than 6, Flink will not share task slots by default. That did not happen. Flink deployed the job in the same way regardless of extra resources. So the conclusion is that simple job resubmitting will not work in this case and actually I cant have any certainty that it will. Since in my case Flink still reuses slot task. If this would be the production case, I would have to do a test job submission on testing env and potentially change the job. Not the config, but adding slot sharing groups etc. So if this would be the production case I will not be able to react fast, I would have to deploy new version of my app/job which could be problematic. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC
As Zhu said, Flink enables slot sharing[1] by default. This feature is nothing to do with the resource of your cluster. The benefit of this feature is written in [1] as well. I mean, it will not detect how many slots in your cluster and adjust its behavior toward this number. If you want to make the best use of your cluster, you can increase the parallelism of the vertex that has the largest parallelism or "disable" the slot sharing by [2]. IMO, the first way matches your purpose. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#task-chaining-and-resource-groups Best, Yangze Guo On Fri, Jan 10, 2020 at 6:49 PM KristoffSC <[hidden email]> wrote: > > Hi Zhu Zhu, > well In my last test I did not change the job config, so I did not change > the parallelism level of any operator and I did not change policy regarding > slot sharing (it stays as default one). Operator Chaining is set to true > without any extra actions like "start new chain, disable chain etc" > > What I assume however is that Flink will try find most efficient way to use > available resources during job submission. > > In the first case, where I had only 6 task managers (which matches max > parallelism of my JobVertex), Flink reused some TaskSlots. Adding extra task > slots did was not effective because reason described by David. This is > understandable. > > However, I was assuming that if I submit my job on a cluster that have more > task managers than 6, Flink will not share task slots by default. That did > not happen. Flink deployed the job in the same way regardless of extra > resources. > > > So the conclusion is that simple job resubmitting will not work in this case > and actually I cant have any certainty that it will. Since in my case Flink > still reuses slot task. > > If this would be the production case, I would have to do a test job > submission on testing env and potentially change the job. Not the config, > but adding slot sharing groups etc. > So if this would be the production case I will not be able to react fast, I > would have to deploy new version of my app/job which could be problematic. > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |