Dynamically creating new Task Managers in YARN

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamically creating new Task Managers in YARN

Piper Piper
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

vino yang
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Piper Piper
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

vino yang
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Jingsong Li
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

vino yang
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Piper Piper
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.  

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang <[hidden email]> wrote:
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Jingsong Li
Hi Piper,

AFAIK, There are no these flexible operations. You can get some information from metrics, but you can not control them.
Maybe you should modify some source code in flink-yarn.

Best,
Jingsong Lee


On Thu, Nov 21, 2019 at 8:17 PM Piper Piper <[hidden email]> wrote:
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.  

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang <[hidden email]> wrote:
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Piper Piper
Thank you, I will check it out. 

On Thu, Nov 21, 2019, 9:21 PM Jingsong Li <[hidden email]> wrote:
Hi Piper,

AFAIK, There are no these flexible operations. You can get some information from metrics, but you can not control them.
Maybe you should modify some source code in flink-yarn.

Best,
Jingsong Lee


On Thu, Nov 21, 2019 at 8:17 PM Piper Piper <[hidden email]> wrote:
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.  

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang <[hidden email]> wrote:
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Yang Wang
Hi Piper,

Jingsong is right. Both per-job and session cluster, the YarnResourceManager will allocate
taskmanager containers dynamically on demand. 

For per-job cluster, it will allocate taskmanagers base on the job slot demand. The excess
containers will return to yarn immediately. When the job finished, jobmanager and all 
taskmanagers will be released.
For sesion-cluster, the YarnResourceManager will not have any taskmanagers on started.
Once the job is submitted, it will allocate the taskmanagers. When the job finished, the 
taskmanagers will enter into idle and be released after the timeout. The jobmanager will
be long-running unless manually stop the session.

I'm just curious why do you want to control the amounts of taskmanagers. Because they are
always allocated on demand.


Best,
Yang

Piper Piper <[hidden email]> 于2019年11月22日周五 上午11:02写道:
Thank you, I will check it out. 

On Thu, Nov 21, 2019, 9:21 PM Jingsong Li <[hidden email]> wrote:
Hi Piper,

AFAIK, There are no these flexible operations. You can get some information from metrics, but you can not control them.
Maybe you should modify some source code in flink-yarn.

Best,
Jingsong Lee


On Thu, Nov 21, 2019 at 8:17 PM Piper Piper <[hidden email]> wrote:
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.  

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang <[hidden email]> wrote:
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Piper Piper
Hello Yang,

Thank you for the explanation!

I want to control the amount of TaskManagers in order to have finer control over allowing/rejecting certain jobs in the cluster.

In Session mode with multiple jobs, is there any way to control whether Flink will fit a new job into empty slots in existing Task Managers versus starting new TaskManagers for every new job?

Thank you,

Piper 

On Thu, Nov 21, 2019 at 10:53 PM Yang Wang <[hidden email]> wrote:
Hi Piper,

Jingsong is right. Both per-job and session cluster, the YarnResourceManager will allocate
taskmanager containers dynamically on demand. 

For per-job cluster, it will allocate taskmanagers base on the job slot demand. The excess
containers will return to yarn immediately. When the job finished, jobmanager and all 
taskmanagers will be released.
For sesion-cluster, the YarnResourceManager will not have any taskmanagers on started.
Once the job is submitted, it will allocate the taskmanagers. When the job finished, the 
taskmanagers will enter into idle and be released after the timeout. The jobmanager will
be long-running unless manually stop the session.

I'm just curious why do you want to control the amounts of taskmanagers. Because they are
always allocated on demand.


Best,
Yang

Piper Piper <[hidden email]> 于2019年11月22日周五 上午11:02写道:
Thank you, I will check it out. 

On Thu, Nov 21, 2019, 9:21 PM Jingsong Li <[hidden email]> wrote:
Hi Piper,

AFAIK, There are no these flexible operations. You can get some information from metrics, but you can not control them.
Maybe you should modify some source code in flink-yarn.

Best,
Jingsong Lee


On Thu, Nov 21, 2019 at 8:17 PM Piper Piper <[hidden email]> wrote:
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.  

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang <[hidden email]> wrote:
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Yang Wang
Hi Piper,

In session mode, Flink will always use the free slots in the existing TaskManagers first.
When it can not full fill the slot request, new TaskManagers will be started.
Did you find some exceptions?

Best,
Yang

Piper Piper <[hidden email]> 于2019年11月23日周六 上午8:52写道:
Hello Yang,

Thank you for the explanation!

I want to control the amount of TaskManagers in order to have finer control over allowing/rejecting certain jobs in the cluster.

In Session mode with multiple jobs, is there any way to control whether Flink will fit a new job into empty slots in existing Task Managers versus starting new TaskManagers for every new job?

Thank you,

Piper 

On Thu, Nov 21, 2019 at 10:53 PM Yang Wang <[hidden email]> wrote:
Hi Piper,

Jingsong is right. Both per-job and session cluster, the YarnResourceManager will allocate
taskmanager containers dynamically on demand. 

For per-job cluster, it will allocate taskmanagers base on the job slot demand. The excess
containers will return to yarn immediately. When the job finished, jobmanager and all 
taskmanagers will be released.
For sesion-cluster, the YarnResourceManager will not have any taskmanagers on started.
Once the job is submitted, it will allocate the taskmanagers. When the job finished, the 
taskmanagers will enter into idle and be released after the timeout. The jobmanager will
be long-running unless manually stop the session.

I'm just curious why do you want to control the amounts of taskmanagers. Because they are
always allocated on demand.


Best,
Yang

Piper Piper <[hidden email]> 于2019年11月22日周五 上午11:02写道:
Thank you, I will check it out. 

On Thu, Nov 21, 2019, 9:21 PM Jingsong Li <[hidden email]> wrote:
Hi Piper,

AFAIK, There are no these flexible operations. You can get some information from metrics, but you can not control them.
Maybe you should modify some source code in flink-yarn.

Best,
Jingsong Lee


On Thu, Nov 21, 2019 at 8:17 PM Piper Piper <[hidden email]> wrote:
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.  

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang <[hidden email]> wrote:
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically creating new Task Managers in YARN

Piper Piper
Hi Yang,

Session mode is working exactly as you described. No exceptions.

Thank you!

Piper


On Sun, Nov 24, 2019 at 11:24 PM Yang Wang <[hidden email]> wrote:
Hi Piper,

In session mode, Flink will always use the free slots in the existing TaskManagers first.
When it can not full fill the slot request, new TaskManagers will be started.
Did you find some exceptions?

Best,
Yang

Piper Piper <[hidden email]> 于2019年11月23日周六 上午8:52写道:
Hello Yang,

Thank you for the explanation!

I want to control the amount of TaskManagers in order to have finer control over allowing/rejecting certain jobs in the cluster.

In Session mode with multiple jobs, is there any way to control whether Flink will fit a new job into empty slots in existing Task Managers versus starting new TaskManagers for every new job?

Thank you,

Piper 

On Thu, Nov 21, 2019 at 10:53 PM Yang Wang <[hidden email]> wrote:
Hi Piper,

Jingsong is right. Both per-job and session cluster, the YarnResourceManager will allocate
taskmanager containers dynamically on demand. 

For per-job cluster, it will allocate taskmanagers base on the job slot demand. The excess
containers will return to yarn immediately. When the job finished, jobmanager and all 
taskmanagers will be released.
For sesion-cluster, the YarnResourceManager will not have any taskmanagers on started.
Once the job is submitted, it will allocate the taskmanagers. When the job finished, the 
taskmanagers will enter into idle and be released after the timeout. The jobmanager will
be long-running unless manually stop the session.

I'm just curious why do you want to control the amounts of taskmanagers. Because they are
always allocated on demand.


Best,
Yang

Piper Piper <[hidden email]> 于2019年11月22日周五 上午11:02写道:
Thank you, I will check it out. 

On Thu, Nov 21, 2019, 9:21 PM Jingsong Li <[hidden email]> wrote:
Hi Piper,

AFAIK, There are no these flexible operations. You can get some information from metrics, but you can not control them.
Maybe you should modify some source code in flink-yarn.

Best,
Jingsong Lee


On Thu, Nov 21, 2019 at 8:17 PM Piper Piper <[hidden email]> wrote:
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.  

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask YARN's RM for, and if we want Flink's RM to ask for resources proactively before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang <[hidden email]> wrote:
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session cluster mode. 

Because I mostly use job cluster mode, so did not have a good knowledge of the new Flink session cluster mode. 

Best,
Vino

Jingsong Li <[hidden email]> 于2019年11月21日周四 下午2:46写道:
Hi Piper and Vino:

Current Flink version, the resources of Flink Session cluster are unrestricted, which means if the requested resources exceed the resources owned by the current session, it will apply to the RM of yarn for new resources.
And if TaskManager is idle for too long, JM will release it to yarn. This behavior is controlled by resourcemanager.taskmanager-timeout . You can set a suitable value for it to enjoy the benefits of reuse process and dynamic resources.

From this point of view, I think session mode is a good choice.
Is this what you want? Piper.

Best,
Jingsong Lee



On Thu, Nov 21, 2019 at 2:25 PM vino yang <[hidden email]> wrote:
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode. 

Because job cluster mode, Flink let YARN manage resources as far as possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things that it is good at. What do you think?

Best,
Vino


Piper Piper <[hidden email]> 于2019年11月21日周四 上午11:55写道:
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will have persistent TMs/containers and request YARN for more TMs/containers when needed (or release TMs/containers back to YARN).

Thank you,

Piper


On Wed, Nov 20, 2019 at 9:39 PM vino yang <[hidden email]> wrote:
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper <[hidden email]> 于2019年11月21日周四 上午5:48写道:
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper


--
Best, Jingsong Lee


--
Best, Jingsong Lee