Flink app cannot restart

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink app cannot restart

rainieli
Hi Flink help,

I am new to Flink. 
I am investigating one flink app that cannot restart when we lose yarn node manager (tc.yarn.rm.cluster.NumActiveNMs=0), while other flink apps can restart automatically.

Here is job's restartPolicy setting:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

1000, org.apache.flink.api.common.time.Time.seconds(30)));


Here is Job Manager log:
2020-07-15 20:26:27,831 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job switched from state RUNNING to FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at java.lang.Thread.run(Thread.java:748)

Here is some yarn node manager log:
2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31] Reading SavedModel from
2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54] Reading meta graph with tags
2020-07-15 20:57:11.928923: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2020-07-15 20:57:11.935924: I tensorflow/cc/saved_model/loader.cc:162] Restoring SavedModel bundle.
2020-07-15 20:57:11.939271: I tensorflow/cc/saved_model/loader.cc:138] Running MainOp with key saved_model_main_op on SavedModel bundle.
2020-07-15 20:57:11.944583: I tensorflow/cc/saved_model/loader.cc:259] SavedModel load for tags; Status: success. Took 16732 microseconds.
2020-07-15 20:58:13.356004: F tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/cc/saved_model/load_attempt_count

Any idea why this app's restartPolicy doesn't work?
Thanks
Best regards
Rainie
Reply | Threaded
Open this post in threaded view
|

Re: Flink app cannot restart

Yang Wang
Could you check for that whether the JobManager is also running on the lost Yarn NodeManager?
If it is the case, you need to configure "yarn.application-attempts" to a value bigger than 1.


BTW, the logs you provided are not Yarn NodeManager logs. And if you could provide the full jobmanager
log, it will help a lot.



Best,
Yang

Rainie Li <[hidden email]> 于2020年7月22日周三 下午3:54写道:
Hi Flink help,

I am new to Flink. 
I am investigating one flink app that cannot restart when we lose yarn node manager (tc.yarn.rm.cluster.NumActiveNMs=0), while other flink apps can restart automatically.

Here is job's restartPolicy setting:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

1000, org.apache.flink.api.common.time.Time.seconds(30)));


Here is Job Manager log:
2020-07-15 20:26:27,831 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job switched from state RUNNING to FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at java.lang.Thread.run(Thread.java:748)

Here is some yarn node manager log:
2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31] Reading SavedModel from
2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54] Reading meta graph with tags
2020-07-15 20:57:11.928923: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2020-07-15 20:57:11.935924: I tensorflow/cc/saved_model/loader.cc:162] Restoring SavedModel bundle.
2020-07-15 20:57:11.939271: I tensorflow/cc/saved_model/loader.cc:138] Running MainOp with key saved_model_main_op on SavedModel bundle.
2020-07-15 20:57:11.944583: I tensorflow/cc/saved_model/loader.cc:259] SavedModel load for tags; Status: success. Took 16732 microseconds.
2020-07-15 20:58:13.356004: F tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/cc/saved_model/load_attempt_count

Any idea why this app's restartPolicy doesn't work?
Thanks
Best regards
Rainie
Reply | Threaded
Open this post in threaded view
|

Re: Flink app cannot restart

rainieli
Thank you Yang, I checked "yarn.application-attempts" is already set to 10.
Here is the exception part from job manager log. Full log file is too big, I also reflected it to remove some company specific info. 
Any suggestion to this exception would be appreciated!

2020-07-15 20:04:52,265 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 490 @ 1223
 for job 3a5aca9433cad1b6caa1b11227b9aa4a.
2020-07-15 20:04:55,987 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 490 for job 39393993 (3886147 bytes in 3736 ms).
2020-07-15 20:09:41,317 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - (137/240) (39393993) switched from RUNNING to FAILED on container_e01_id @ cluster name (dataPort=43743).
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e01_id timed out.
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-07-15 20:09:41,324 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job name (job id) switched from state RUNNING to FAILING.
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e01_id timed out.
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-07-15 20:09:41,330 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source  (1/60) switched from RUNNING to CANCELING.

Best regards
Rainie

On Wed, Jul 22, 2020 at 7:19 PM Yang Wang <[hidden email]> wrote:
Could you check for that whether the JobManager is also running on the lost Yarn NodeManager?
If it is the case, you need to configure "yarn.application-attempts" to a value bigger than 1.


BTW, the logs you provided are not Yarn NodeManager logs. And if you could provide the full jobmanager
log, it will help a lot.



Best,
Yang

Rainie Li <[hidden email]> 于2020年7月22日周三 下午3:54写道:
Hi Flink help,

I am new to Flink. 
I am investigating one flink app that cannot restart when we lose yarn node manager (tc.yarn.rm.cluster.NumActiveNMs=0), while other flink apps can restart automatically.

Here is job's restartPolicy setting:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

1000, org.apache.flink.api.common.time.Time.seconds(30)));


Here is Job Manager log:
2020-07-15 20:26:27,831 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job switched from state RUNNING to FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at java.lang.Thread.run(Thread.java:748)

Here is some yarn node manager log:
2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31] Reading SavedModel from
2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54] Reading meta graph with tags
2020-07-15 20:57:11.928923: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2020-07-15 20:57:11.935924: I tensorflow/cc/saved_model/loader.cc:162] Restoring SavedModel bundle.
2020-07-15 20:57:11.939271: I tensorflow/cc/saved_model/loader.cc:138] Running MainOp with key saved_model_main_op on SavedModel bundle.
2020-07-15 20:57:11.944583: I tensorflow/cc/saved_model/loader.cc:259] SavedModel load for tags; Status: success. Took 16732 microseconds.
2020-07-15 20:58:13.356004: F tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/cc/saved_model/load_attempt_count

Any idea why this app's restartPolicy doesn't work?
Thanks
Best regards
Rainie
Reply | Threaded
Open this post in threaded view
|

Re: Flink app cannot restart

rmetzger0
Hi Rainie,

I believe we need the full JobManager log to understand what's going on with your job. The logs you've provided so far only tell us that a TaskManager has died (which is expected, when a node goes down). What is interesting to see is what's happening next: are we having enough resources to restart the job? is there some issue restarting it?

If you feel uncomfortable sharing the full logs on a public mailing list, feel free to send the logs just to Yang Wang and/or me.

Best,
Robert


On Thu, Jul 23, 2020 at 9:18 AM Rainie Li <[hidden email]> wrote:
Thank you Yang, I checked "yarn.application-attempts" is already set to 10.
Here is the exception part from job manager log. Full log file is too big, I also reflected it to remove some company specific info. 
Any suggestion to this exception would be appreciated!

2020-07-15 20:04:52,265 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 490 @ 1223
 for job 3a5aca9433cad1b6caa1b11227b9aa4a.
2020-07-15 20:04:55,987 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 490 for job 39393993 (3886147 bytes in 3736 ms).
2020-07-15 20:09:41,317 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - (137/240) (39393993) switched from RUNNING to FAILED on container_e01_id @ cluster name (dataPort=43743).
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e01_id timed out.
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-07-15 20:09:41,324 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job name (job id) switched from state RUNNING to FAILING.
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e01_id timed out.
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-07-15 20:09:41,330 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source  (1/60) switched from RUNNING to CANCELING.

Best regards
Rainie

On Wed, Jul 22, 2020 at 7:19 PM Yang Wang <[hidden email]> wrote:
Could you check for that whether the JobManager is also running on the lost Yarn NodeManager?
If it is the case, you need to configure "yarn.application-attempts" to a value bigger than 1.


BTW, the logs you provided are not Yarn NodeManager logs. And if you could provide the full jobmanager
log, it will help a lot.



Best,
Yang

Rainie Li <[hidden email]> 于2020年7月22日周三 下午3:54写道:
Hi Flink help,

I am new to Flink. 
I am investigating one flink app that cannot restart when we lose yarn node manager (tc.yarn.rm.cluster.NumActiveNMs=0), while other flink apps can restart automatically.

Here is job's restartPolicy setting:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

1000, org.apache.flink.api.common.time.Time.seconds(30)));


Here is Job Manager log:
2020-07-15 20:26:27,831 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job switched from state RUNNING to FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at java.lang.Thread.run(Thread.java:748)

Here is some yarn node manager log:
2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31] Reading SavedModel from
2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54] Reading meta graph with tags
2020-07-15 20:57:11.928923: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2020-07-15 20:57:11.935924: I tensorflow/cc/saved_model/loader.cc:162] Restoring SavedModel bundle.
2020-07-15 20:57:11.939271: I tensorflow/cc/saved_model/loader.cc:138] Running MainOp with key saved_model_main_op on SavedModel bundle.
2020-07-15 20:57:11.944583: I tensorflow/cc/saved_model/loader.cc:259] SavedModel load for tags; Status: success. Took 16732 microseconds.
2020-07-15 20:58:13.356004: F tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/cc/saved_model/load_attempt_count

Any idea why this app's restartPolicy doesn't work?
Thanks
Best regards
Rainie