Setting operator parallelism of a running job - Flink 1.2

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting operator parallelism of a running job - Flink 1.2

Dominik Safaric
Hi all,

Is it possible to set the operator parallelism using Flink CLI while a job is running?

I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the number of task slots is set to 4, whereas the paralellism.default to 16.

However, if a worker fails, whereas the jobs were configured at system level to run with 16 task slots, I get the exception “Not enough free slots available to run the job.” raised and the job is not able to continue but instead of aborts.

Is this the excepted behaviour? Shouldn’t Flink continue the job execution with in this case only 12 slots available? If not, can someone change the parallelism of a job while in the restart mode in order to allow the job to continue?

Thanks,
Dominik
Reply | Threaded
Open this post in threaded view
|

Re: Setting operator parallelism of a running job - Flink 1.2

Aljoscha Krettek
Hi,
changing the parallelism is not possible while a job is running (currently). What you would have to do to change the parallelism is create a savepoint and then restore from that savepoint with a different parallelism.


Best,
Aljoscha
On 21. Apr 2017, at 15:22, Dominik Safaric <[hidden email]> wrote:

Hi all,

Is it possible to set the operator parallelism using Flink CLI while a job is running?

I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the number of task slots is set to 4, whereas the paralellism.default to 16.

However, if a worker fails, whereas the jobs were configured at system level to run with 16 task slots, I get the exception “Not enough free slots available to run the job.” raised and the job is not able to continue but instead of aborts.

Is this the excepted behaviour? Shouldn’t Flink continue the job execution with in this case only 12 slots available? If not, can someone change the parallelism of a job while in the restart mode in order to allow the job to continue?

Thanks,
Dominik

Reply | Threaded
Open this post in threaded view
|

Re: Setting operator parallelism of a running job - Flink 1.2

Dominik Safaric
Hi Aljoscha,

In other words, jobs must be restarted manually? 

What about using maxParallelism() at the client level? I would expect that it is complementary to parallelism.default in terms of allowing Flink to handle the parallelism of operators, and changing it in accordance to runtime conditions. However, it is not the case. 

Best,
Dominik

On 21 Apr 2017, at 15:36, Aljoscha Krettek <[hidden email]> wrote:

Hi,
changing the parallelism is not possible while a job is running (currently). What you would have to do to change the parallelism is create a savepoint and then restore from that savepoint with a different parallelism.


Best,
Aljoscha
On 21. Apr 2017, at 15:22, Dominik Safaric <[hidden email]> wrote:

Hi all,

Is it possible to set the operator parallelism using Flink CLI while a job is running?

I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the number of task slots is set to 4, whereas the paralellism.default to 16.

However, if a worker fails, whereas the jobs were configured at system level to run with 16 task slots, I get the exception “Not enough free slots available to run the job.” raised and the job is not able to continue but instead of aborts.

Is this the excepted behaviour? Shouldn’t Flink continue the job execution with in this case only 12 slots available? If not, can someone change the parallelism of a job while in the restart mode in order to allow the job to continue?

Thanks,
Dominik


Reply | Threaded
Open this post in threaded view
|

Re: Setting operator parallelism of a running job - Flink 1.2

Aljoscha Krettek
Hi,
both savepoints and checkpoints use the configured state backend. Right now, the only difference between a checkpoint and a savepoint is that the savepoint has additional meta data stored with it that makes it persistent and relocatable. In the future, the (on-disk) format of savepoints and checkpoints will diverge, though.

Best,
Aljoscha
On 21. Apr 2017, at 16:09, Dominik Safaric <[hidden email]> wrote:

But what is then the difference between statepoints and checkpoints as configured by using e.g. the StreamExecutionEnv’s setStateBackend() function? 

Best,
Dominik

On 21 Apr 2017, at 15:53, Aljoscha Krettek <[hidden email]> wrote:

Correct, the max-parallelism only sets bounds on how high you can set the parallelism in the future (by restoring from a savepoint).

Internally, the keyed state is partitioned into key groups where you have as many key groups as max parallelism. This is the unit of state that we can redistribute when the parallelism is changed and therefore the upper bound.

Best,
Aljoscha
On 21. Apr 2017, at 15:50, Dominik Safaric <[hidden email]> wrote:

Hi Aljoscha,

In other words, jobs must be restarted manually? 

What about using maxParallelism() at the client level? I would expect that it is complementary to parallelism.default in terms of allowing Flink to handle the parallelism of operators, and changing it in accordance to runtime conditions. However, it is not the case. 

Best,
Dominik

On 21 Apr 2017, at 15:36, Aljoscha Krettek <[hidden email]> wrote:

Hi,
changing the parallelism is not possible while a job is running (currently). What you would have to do to change the parallelism is create a savepoint and then restore from that savepoint with a different parallelism.


Best,
Aljoscha
On 21. Apr 2017, at 15:22, Dominik Safaric <[hidden email]> wrote:

Hi all,

Is it possible to set the operator parallelism using Flink CLI while a job is running?

I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the number of task slots is set to 4, whereas the paralellism.default to 16.

However, if a worker fails, whereas the jobs were configured at system level to run with 16 task slots, I get the exception “Not enough free slots available to run the job.” raised and the job is not able to continue but instead of aborts.

Is this the excepted behaviour? Shouldn’t Flink continue the job execution with in this case only 12 slots available? If not, can someone change the parallelism of a job while in the restart mode in order to allow the job to continue?

Thanks,
Dominik