Flink on YARN - how to resize a running cluster?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink on YARN - how to resize a running cluster?

Josh
I'm running a Flink cluster as a YARN application, started by:
./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d

There are 2 worker nodes, so each are allocated 2 task managers. There is a stateful Flink job running on the cluster with a parallelism of 2.

If I now want to increase the number of worker nodes to 3, and add 2 extra task managers, and then increase the job parallelism, how should I do this?

I'm using EMR, so adding an extra worker node and making it available to YARN is easy to do via the AWS console. But I haven't been able to find any information in Flink docs about how to resize a running Flink cluster on YARN. Is it possible to resize it while the YARN application is running, or do I need to stop the YARN application and redeploy the cluster? Also do I need to redeploy my Flink job from a savepoint to increase its parallelism, or do I do this while the job is running?

I tried redeploying the cluster having added a third worker node, via:

> yarn application -kill myflinkcluster

> ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d

(note increasing the number of task managers from 4 to 6)

Surprisingly, this redeployed a Flink cluster with 4 task mangers (not 6!) and restored my job from the last checkpoint.

Can anyone point me in the right direction?

Thanks,

Josh



Reply | Threaded
Open this post in threaded view
|

Re: Flink on YARN - how to resize a running cluster?

Till Rohrmann
Hi Josh,

at the moment it is not possible to dynamically increase the parallelism of your job. The same holds true for a restarting a job from a savepoint. But we're currently working on exactly this. So in order to change the parallelism of your job, you would have to restart the job from scratch.

Adding task managers dynamically to your running Flink cluster, is possible if you allocate new YARN containers and then start a TaskManager process manually with the current job manager address and port. You can either find the address and port out using the web dashboard under job manager configuration or you look up the .yarn-properties file which is stored in your temp directory on your machine. This file also contains the job manager address. But the easier way would be to stop your yarn session and then restart it with an increased number of containers. Because then, you wouldn't have to ship the lib directory, which might contain user code classes, manually.

Cheers,
Till

On Wed, Jun 29, 2016 at 10:13 PM, Josh <[hidden email]> wrote:
I'm running a Flink cluster as a YARN application, started by:
./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d

There are 2 worker nodes, so each are allocated 2 task managers. There is a stateful Flink job running on the cluster with a parallelism of 2.

If I now want to increase the number of worker nodes to 3, and add 2 extra task managers, and then increase the job parallelism, how should I do this?

I'm using EMR, so adding an extra worker node and making it available to YARN is easy to do via the AWS console. But I haven't been able to find any information in Flink docs about how to resize a running Flink cluster on YARN. Is it possible to resize it while the YARN application is running, or do I need to stop the YARN application and redeploy the cluster? Also do I need to redeploy my Flink job from a savepoint to increase its parallelism, or do I do this while the job is running?

I tried redeploying the cluster having added a third worker node, via:

> yarn application -kill myflinkcluster

> ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d

(note increasing the number of task managers from 4 to 6)

Surprisingly, this redeployed a Flink cluster with 4 task mangers (not 6!) and restored my job from the last checkpoint.

Can anyone point me in the right direction?

Thanks,

Josh




Reply | Threaded
Open this post in threaded view
|

Re: Flink on YARN - how to resize a running cluster?

Josh
Hi Till,

Thanks, that's very helpful!
So I guess in that case, since it isn't possible to increase the job parallelism later, it might be sensible to use say 10x the parallelism that I need right now (even if only running on a couple of task managers) - so that it's possible to scale the job in the future if I need to?

Josh

On Thu, Jun 30, 2016 at 11:17 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

at the moment it is not possible to dynamically increase the parallelism of your job. The same holds true for a restarting a job from a savepoint. But we're currently working on exactly this. So in order to change the parallelism of your job, you would have to restart the job from scratch.

Adding task managers dynamically to your running Flink cluster, is possible if you allocate new YARN containers and then start a TaskManager process manually with the current job manager address and port. You can either find the address and port out using the web dashboard under job manager configuration or you look up the .yarn-properties file which is stored in your temp directory on your machine. This file also contains the job manager address. But the easier way would be to stop your yarn session and then restart it with an increased number of containers. Because then, you wouldn't have to ship the lib directory, which might contain user code classes, manually.

Cheers,
Till

On Wed, Jun 29, 2016 at 10:13 PM, Josh <[hidden email]> wrote:
I'm running a Flink cluster as a YARN application, started by:
./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d

There are 2 worker nodes, so each are allocated 2 task managers. There is a stateful Flink job running on the cluster with a parallelism of 2.

If I now want to increase the number of worker nodes to 3, and add 2 extra task managers, and then increase the job parallelism, how should I do this?

I'm using EMR, so adding an extra worker node and making it available to YARN is easy to do via the AWS console. But I haven't been able to find any information in Flink docs about how to resize a running Flink cluster on YARN. Is it possible to resize it while the YARN application is running, or do I need to stop the YARN application and redeploy the cluster? Also do I need to redeploy my Flink job from a savepoint to increase its parallelism, or do I do this while the job is running?

I tried redeploying the cluster having added a third worker node, via:

> yarn application -kill myflinkcluster

> ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d

(note increasing the number of task managers from 4 to 6)

Surprisingly, this redeployed a Flink cluster with 4 task mangers (not 6!) and restored my job from the last checkpoint.

Can anyone point me in the right direction?

Thanks,

Josh





Reply | Threaded
Open this post in threaded view
|

Re: Flink on YARN - how to resize a running cluster?

Márton Balassi
Hi Josh,

Yes, currently that is a reasonable workaround.

Best,

Marton

On Thu, Jun 30, 2016 at 12:38 PM, Josh <[hidden email]> wrote:
Hi Till,

Thanks, that's very helpful!
So I guess in that case, since it isn't possible to increase the job parallelism later, it might be sensible to use say 10x the parallelism that I need right now (even if only running on a couple of task managers) - so that it's possible to scale the job in the future if I need to?

Josh

On Thu, Jun 30, 2016 at 11:17 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

at the moment it is not possible to dynamically increase the parallelism of your job. The same holds true for a restarting a job from a savepoint. But we're currently working on exactly this. So in order to change the parallelism of your job, you would have to restart the job from scratch.

Adding task managers dynamically to your running Flink cluster, is possible if you allocate new YARN containers and then start a TaskManager process manually with the current job manager address and port. You can either find the address and port out using the web dashboard under job manager configuration or you look up the .yarn-properties file which is stored in your temp directory on your machine. This file also contains the job manager address. But the easier way would be to stop your yarn session and then restart it with an increased number of containers. Because then, you wouldn't have to ship the lib directory, which might contain user code classes, manually.

Cheers,
Till

On Wed, Jun 29, 2016 at 10:13 PM, Josh <[hidden email]> wrote:
I'm running a Flink cluster as a YARN application, started by:
./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d

There are 2 worker nodes, so each are allocated 2 task managers. There is a stateful Flink job running on the cluster with a parallelism of 2.

If I now want to increase the number of worker nodes to 3, and add 2 extra task managers, and then increase the job parallelism, how should I do this?

I'm using EMR, so adding an extra worker node and making it available to YARN is easy to do via the AWS console. But I haven't been able to find any information in Flink docs about how to resize a running Flink cluster on YARN. Is it possible to resize it while the YARN application is running, or do I need to stop the YARN application and redeploy the cluster? Also do I need to redeploy my Flink job from a savepoint to increase its parallelism, or do I do this while the job is running?

I tried redeploying the cluster having added a third worker node, via:

> yarn application -kill myflinkcluster

> ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d

(note increasing the number of task managers from 4 to 6)

Surprisingly, this redeployed a Flink cluster with 4 task mangers (not 6!) and restored my job from the last checkpoint.

Can anyone point me in the right direction?

Thanks,

Josh






Reply | Threaded
Open this post in threaded view
|

Re: Flink on YARN - how to resize a running cluster?

Till Rohrmann
Yes that's the way to go at the moment.

Cheers,
Till

On Thu, Jun 30, 2016 at 12:47 PM, Márton Balassi <[hidden email]> wrote:
Hi Josh,

Yes, currently that is a reasonable workaround.

Best,

Marton

On Thu, Jun 30, 2016 at 12:38 PM, Josh <[hidden email]> wrote:
Hi Till,

Thanks, that's very helpful!
So I guess in that case, since it isn't possible to increase the job parallelism later, it might be sensible to use say 10x the parallelism that I need right now (even if only running on a couple of task managers) - so that it's possible to scale the job in the future if I need to?

Josh

On Thu, Jun 30, 2016 at 11:17 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

at the moment it is not possible to dynamically increase the parallelism of your job. The same holds true for a restarting a job from a savepoint. But we're currently working on exactly this. So in order to change the parallelism of your job, you would have to restart the job from scratch.

Adding task managers dynamically to your running Flink cluster, is possible if you allocate new YARN containers and then start a TaskManager process manually with the current job manager address and port. You can either find the address and port out using the web dashboard under job manager configuration or you look up the .yarn-properties file which is stored in your temp directory on your machine. This file also contains the job manager address. But the easier way would be to stop your yarn session and then restart it with an increased number of containers. Because then, you wouldn't have to ship the lib directory, which might contain user code classes, manually.

Cheers,
Till

On Wed, Jun 29, 2016 at 10:13 PM, Josh <[hidden email]> wrote:
I'm running a Flink cluster as a YARN application, started by:
./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d

There are 2 worker nodes, so each are allocated 2 task managers. There is a stateful Flink job running on the cluster with a parallelism of 2.

If I now want to increase the number of worker nodes to 3, and add 2 extra task managers, and then increase the job parallelism, how should I do this?

I'm using EMR, so adding an extra worker node and making it available to YARN is easy to do via the AWS console. But I haven't been able to find any information in Flink docs about how to resize a running Flink cluster on YARN. Is it possible to resize it while the YARN application is running, or do I need to stop the YARN application and redeploy the cluster? Also do I need to redeploy my Flink job from a savepoint to increase its parallelism, or do I do this while the job is running?

I tried redeploying the cluster having added a third worker node, via:

> yarn application -kill myflinkcluster

> ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d

(note increasing the number of task managers from 4 to 6)

Surprisingly, this redeployed a Flink cluster with 4 task mangers (not 6!) and restored my job from the last checkpoint.

Can anyone point me in the right direction?

Thanks,

Josh