Flink on Yarn, restart job will not destroy original task manager

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink on Yarn, restart job will not destroy original task manager

James (Jian Wu) [FDS Data Platform]

Hi:

 

  I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script

 

#!/bin/sh

CLASSNAME=$1

JARNAME=$2

ARUGMENTS=$3

 

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"

/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 8192  -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \

$JARNAME $ARUGMENTS

 

 

The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn,  but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart)

 

In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource.

 

Any one can give me some suggestion?

 

Regards

 

James

Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn, restart job will not destroy original task manager

Gary Yao-2
Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

  I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script

 

#!/bin/sh

CLASSNAME=$1

JARNAME=$2

ARUGMENTS=$3

 

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"

/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 8192  -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \

$JARNAME $ARUGMENTS

 

 

The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn,  but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart)

 

In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource.

 

Any one can give me some suggestion?

 

Regards

 

James


Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn, restart job will not destroy original task manager

James (Jian Wu) [FDS Data Platform]

My Flink version is 1.5, I will rebuild new version flink

 

Regards

 

James

 

From: Gary Yao <[hidden email]>
Date: Monday, September 3, 2018 at 3:57 PM
To: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

 

Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

 

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

  I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script

 

#!/bin/sh

CLASSNAME=$1

JARNAME=$2

ARUGMENTS=$3

 

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"

/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 8192  -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \

$JARNAME $ARUGMENTS

 

 

The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn,  but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart)

 

In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource.

 

Any one can give me some suggestion?

 

Regards

 

James

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn, restart job will not destroy original task manager

James (Jian Wu) [FDS Data Platform]

Hi Gary:

 

From 1.5/1.6 document:

 

Configuring task-local recovery

Task-local recovery is deactivated by default and can be activated through Flink’s configuration with the key state.backend.local-recovery as specified in CheckpointingOptions.LOCAL_RECOVERY. The value for this setting can either be true to enable or false(default) to disable local recovery.

 

By default, local recovery is deactive. In 1.5.0, I’ve not enable local recovery.

 

So whether I need manual disable local recovery via flink.conf?

 

Regards

 

James

 

From: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Date: Monday, September 3, 2018 at 4:13 PM
To: Gary Yao <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

 

My Flink version is 1.5, I will rebuild new version flink

 

Regards

 

James

 

From: Gary Yao <[hidden email]>
Date: Monday, September 3, 2018 at 3:57 PM
To: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

 

Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

 

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

  I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script

 

#!/bin/sh

CLASSNAME=$1

JARNAME=$2

ARUGMENTS=$3

 

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"

/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 8192  -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \

$JARNAME $ARUGMENTS

 

 

The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn,  but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart)

 

In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource.

 

Any one can give me some suggestion?

 

Regards

 

James

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn, restart job will not destroy original task manager

Gary Yao-2
Hi James,

Local recovery is disabled by default. You do not need to configure anything
in addition.

Did you run into problems again or does it work now? If you are stil
experiencing task spread out, can you configure logging on DEBUG level, and
share the jobmanager logs with us?

Best,
Gary

On Tue, Sep 4, 2018 at 5:42 AM, James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi Gary:

 

From 1.5/1.6 document:

 

Configuring task-local recovery

Task-local recovery is deactivated by default and can be activated through Flink’s configuration with the key state.backend.local-recovery as specified in CheckpointingOptions.LOCAL_RECOVERY. The value for this setting can either be true to enable or false(default) to disable local recovery.

 

By default, local recovery is deactive. In 1.5.0, I’ve not enable local recovery.

 

So whether I need manual disable local recovery via flink.conf?

 

Regards

 

James

 

From: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Date: Monday, September 3, 2018 at 4:13 PM
To: Gary Yao <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

 

My Flink version is 1.5, I will rebuild new version flink

 

Regards

 

James

 

From: Gary Yao <[hidden email]>
Date: Monday, September 3, 2018 at 3:57 PM
To: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

 

Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

 

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

  I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script

 

#!/bin/sh

CLASSNAME=$1

JARNAME=$2

ARUGMENTS=$3

 

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"

/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 8192  -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \

$JARNAME $ARUGMENTS

 

 

The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn,  but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart)

 

In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource.

 

Any one can give me some suggestion?

 

Regards

 

James