Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Subramanya Suresh

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@...:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.



Cheers, 
Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Till Rohrmann
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@...:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.



Cheers, 
Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Subramanya Suresh
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@....net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.



Cheers, 



--

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Till Rohrmann
Could you check whether akka.tcp://flink@...:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@...:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.



Cheers, 



--

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Subramanya Suresh
Thanks TIll, 
I do not see any Akka related messages in that Taskmanager after the initial startup. It seemed like all is well. So after the remotewatcher detects it unreachable and the TaskManager unregisters it, I do not see any other activity in the JobManager with regards to reallocation etc. 
- Does the quarantining of the TaskManager not happen until  the exit-on-fatal-akka-error is turned on ? 
- Does the JobManager or the TaskManager try to reconnect to each other again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being unregistered, until the TaskManager exits ? I think it should, especially if it is not trying to establish a connection again. 

I will give the flag a try. 

Sincerely, 


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <[hidden email]> wrote:
Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@....net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.



Cheers, 



--




--

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Subramanya Suresh
Hi, 
With some additional research, 

Before the flag
I realized for failed containers (that exited for a specific  we still were Requesting new TM container and launching TM). But for the "Detected unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not see the container marked as failed and a subsequent request for TM. 

After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Our Issue:
I realized we are still exiting the application, i.e. failing when the containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler throws a NoResourceAvailableException that causes a failure. In our case we had fixed restart strategy with 5, and we are running out of it because of this. I am looking to solve this with a FailureRateRestartStrategy over 2 minutes interval (10 second restart delay, >12 failures), that lets the TM come back (takes about 50 seconds). 

Flink Bug
But I cannot help but think why there is no interaction between the ResourceManager and JobManager, i.e. why is the jobmanager continuing with the processing despite not having the required TMs ? 

Logs to substantiate what I said above (only relevant). 

018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000097 completed successfully with diagnostics: Container released by application


2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e27_1535135887442_1381_01_000102 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 1
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:46219/user/taskmanager terminated.
2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_1381_01_000102 @ hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720

2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000147 - Remaining pending container requests: 0
2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.

2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.
2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state CREATED to RUNNING.
2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF




2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e28_1535135887442_1381_01_000147 has started.
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e28_1535135887442_1381_01_000147 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:41966/user/taskmanager terminated.
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 2
2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720.

2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000202 - Remaining pending container requests: 0
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000203 - Remaining pending container requests: 0
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Returning excess container container_e28_1535135887442_1381_01_000203

Notice there is no TaskManager container_e28_1535135887442_1381_01_000202 has started. 
I see 
2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state FAILING to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R


2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager




On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <[hidden email]> wrote:
Thanks TIll, 
I do not see any Akka related messages in that Taskmanager after the initial startup. It seemed like all is well. So after the remotewatcher detects it unreachable and the TaskManager unregisters it, I do not see any other activity in the JobManager with regards to reallocation etc. 
- Does the quarantining of the TaskManager not happen until  the exit-on-fatal-akka-error is turned on ? 
- Does the JobManager or the TaskManager try to reconnect to each other again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being unregistered, until the TaskManager exits ? I think it should, especially if it is not trying to establish a connection again. 

I will give the flag a try. 

Sincerely, 


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <[hidden email]> wrote:
Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@....net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@....net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:27,997 INFO  org.apache.flink.runtime.history.FsJobArchivist               - Job ab7e96a1659fbb4bfb3c6cd9bccb0335 has been archived at hdfs:/savedSearches/prod/completed-jobs/ab7e96a1659fbb4bfb3c6cd9bccb0335.



Cheers, 



--




--




--

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Till Rohrmann
Hi Subramanya,

if the container is still running and the TM can simply not connect to the JobManager, then the ResourceManager does not see a problem. The RM things in terms of containers and as long as n containers are running, it won't start new ones. That's the reason why the TM should exit in order to terminate the container.

Have you tried using a newer Flink version? Lately we have reworked a good part of Flink's distributed architecture and added resource elasticity (starting with Flink 1.5).

Cheers,
Till

On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <[hidden email]> wrote:
Hi, 
With some additional research, 

Before the flag
I realized for failed containers (that exited for a specific  we still were Requesting new TM container and launching TM). But for the "Detected unreachable: [akka.tcp://flink@...:123]" issue I do not see the container marked as failed and a subsequent request for TM. 

After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@...:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Our Issue:
I realized we are still exiting the application, i.e. failing when the containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler throws a NoResourceAvailableException that causes a failure. In our case we had fixed restart strategy with 5, and we are running out of it because of this. I am looking to solve this with a FailureRateRestartStrategy over 2 minutes interval (10 second restart delay, >12 failures), that lets the TM come back (takes about 50 seconds). 

Flink Bug
But I cannot help but think why there is no interaction between the ResourceManager and JobManager, i.e. why is the jobmanager continuing with the processing despite not having the required TMs ? 

Logs to substantiate what I said above (only relevant). 

018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000097 completed successfully with diagnostics: Container released by application


2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e27_1535135887442_1381_01_000102 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 1
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:46219/user/taskmanager terminated.
2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_1381_01_000102 @ hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720

2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000147 - Remaining pending container requests: 0
2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.

2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.
2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state CREATED to RUNNING.
2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF




2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e28_1535135887442_1381_01_000147 has started.
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e28_1535135887442_1381_01_000147 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:41966/user/taskmanager terminated.
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 2
2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720.

2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000202 - Remaining pending container requests: 0
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000203 - Remaining pending container requests: 0
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Returning excess container container_e28_1535135887442_1381_01_000203

Notice there is no TaskManager container_e28_1535135887442_1381_01_000202 has started. 
I see 
2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state FAILING to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R


2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager




On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <[hidden email]> wrote:
Thanks TIll, 
I do not see any Akka related messages in that Taskmanager after the initial startup. It seemed like all is well. So after the remotewatcher detects it unreachable and the TaskManager unregisters it, I do not see any other activity in the JobManager with regards to reallocation etc. 
- Does the quarantining of the TaskManager not happen until  the exit-on-fatal-akka-error is turned on ? 
- Does the JobManager or the TaskManager try to reconnect to each other again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being unregistered, until the TaskManager exits ? I think it should, especially if it is not trying to establish a connection again. 

I will give the flag a try. 

Sincerely, 


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <[hidden email]> wrote:
Could you check whether akka.tcp://flink@...:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@...:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Subramanya Suresh
Hi Till,
After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@....net:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Quick update, I can confirm our issue still happens even with the flag being true. With a proper restart strategy (rate based, that gives it enough time) it can recover from container failures like the first case below, but not able to recover from "detected unreachable" issues like the second case below. 

We are currently using the below configuration. So I guess the only options left are to increase the heartbeat.pause or move to 1.6 as you suggested

akka.client.timeout: 600s
akka.ask.timeout: 600s
akka.lookup.timeout: 600s
akka.watch.heartbeat.pause: 120s

___________________________________________________
8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_e29_1536261974019_1134_01_000036 failed. Exit status: -100
<a href="tel:2018091116">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_e29_1536261974019_1134_01_000036 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
<a href="tel:2018091116">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1
<a href="tel:2018091116">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
<a href="tel:2018091116">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116">2018-09-11 16:30:44,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from state RUNNING to FAILING.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116">2018-09-11 16:30:49,700 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@hello-world2-13:41157] has failed, address is now gated for [5000] ms. Reason: [Disassociated].
This container 2-13, just has a received
<a href="tel:2018091116">2018-09-11 16:30:47,195 INFO org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
___________________________________________________
But then failed with the same old
018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@hello-world3-3:44607]
<a href="tel:2018091116">2018-09-11 16:42:58,409 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hello-world3-3/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

___________________________________________________

On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

if the container is still running and the TM can simply not connect to the JobManager, then the ResourceManager does not see a problem. The RM things in terms of containers and as long as n containers are running, it won't start new ones. That's the reason why the TM should exit in order to terminate the container.

Have you tried using a newer Flink version? Lately we have reworked a good part of Flink's distributed architecture and added resource elasticity (starting with Flink 1.5).

Cheers,
Till

On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <[hidden email]> wrote:
Hi, 
With some additional research, 

Before the flag
I realized for failed containers (that exited for a specific  we still were Requesting new TM container and launching TM). But for the "Detected unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not see the container marked as failed and a subsequent request for TM. 

After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Our Issue:
I realized we are still exiting the application, i.e. failing when the containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler throws a NoResourceAvailableException that causes a failure. In our case we had fixed restart strategy with 5, and we are running out of it because of this. I am looking to solve this with a FailureRateRestartStrategy over 2 minutes interval (10 second restart delay, >12 failures), that lets the TM come back (takes about 50 seconds). 

Flink Bug
But I cannot help but think why there is no interaction between the ResourceManager and JobManager, i.e. why is the jobmanager continuing with the processing despite not having the required TMs ? 

Logs to substantiate what I said above (only relevant). 

018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000097 completed successfully with diagnostics: Container released by application


2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e27_1535135887442_1381_01_000102 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 1
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_1381_01_000102 @ hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720

2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000147 - Remaining pending container requests: 0
2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.

2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.
2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state CREATED to RUNNING.
2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF




2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e28_1535135887442_1381_01_000147 has started.
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e28_1535135887442_1381_01_000147 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 2
2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720.

2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000202 - Remaining pending container requests: 0
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000203 - Remaining pending container requests: 0
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Returning excess container container_e28_1535135887442_1381_01_000203

Notice there is no TaskManager container_e28_1535135887442_1381_01_000202 has started. 
I see 
2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state FAILING to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R


2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager




On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <[hidden email]> wrote:
Thanks TIll, 
I do not see any Akka related messages in that Taskmanager after the initial startup. It seemed like all is well. So after the remotewatcher detects it unreachable and the TaskManager unregisters it, I do not see any other activity in the JobManager with regards to reallocation etc. 
- Does the quarantining of the TaskManager not happen until  the exit-on-fatal-akka-error is turned on ? 
- Does the JobManager or the TaskManager try to reconnect to each other again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being unregistered, until the TaskManager exits ? I think it should, especially if it is not trying to establish a connection again. 

I will give the flag a try. 

Sincerely, 


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <[hidden email]> wrote:
Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@....net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager




--

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Subramanya Suresh
Hi Till, 
Update on this. We are still weeding past 1.6.0 setup and run. In a separate thread, we are running into issues, and sense more on the horizon before we get it working. 
We are under some tight timelines, so want to ask how confident you are that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2 is the longer route. 

Sincerely, 

On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <[hidden email]> wrote:
Hi Till,
After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@....net:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Quick update, I can confirm our issue still happens even with the flag being true. With a proper restart strategy (rate based, that gives it enough time) it can recover from container failures like the first case below, but not able to recover from "detected unreachable" issues like the second case below. 

We are currently using the below configuration. So I guess the only options left are to increase the heartbeat.pause or move to 1.6 as you suggested

akka.client.timeout: 600s
akka.ask.timeout: 600s
akka.lookup.timeout: 600s
akka.watch.heartbeat.pause: 120s

___________________________________________________
8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_e29_1536261974019_1134_01_000036 failed. Exit status: -100
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_e29_1536261974019_1134_01_000036 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from state RUNNING to FAILING.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:49,700 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@hello-world2-13:41157] has failed, address is now gated for [5000] ms. Reason: [Disassociated].
This container 2-13, just has a received
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:47,195 INFO org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
___________________________________________________
But then failed with the same old
018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@hello-world3-3:44607]
<a href="tel:2018091116" target="_blank">2018-09-11 16:42:58,409 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hello-world3-3/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

___________________________________________________

On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

if the container is still running and the TM can simply not connect to the JobManager, then the ResourceManager does not see a problem. The RM things in terms of containers and as long as n containers are running, it won't start new ones. That's the reason why the TM should exit in order to terminate the container.

Have you tried using a newer Flink version? Lately we have reworked a good part of Flink's distributed architecture and added resource elasticity (starting with Flink 1.5).

Cheers,
Till

On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <[hidden email]> wrote:
Hi, 
With some additional research, 

Before the flag
I realized for failed containers (that exited for a specific  we still were Requesting new TM container and launching TM). But for the "Detected unreachable: [akka.tcp://flink@....net:123]" issue I do not see the container marked as failed and a subsequent request for TM. 

After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@....net:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Our Issue:
I realized we are still exiting the application, i.e. failing when the containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler throws a NoResourceAvailableException that causes a failure. In our case we had fixed restart strategy with 5, and we are running out of it because of this. I am looking to solve this with a FailureRateRestartStrategy over 2 minutes interval (10 second restart delay, >12 failures), that lets the TM come back (takes about 50 seconds). 

Flink Bug
But I cannot help but think why there is no interaction between the ResourceManager and JobManager, i.e. why is the jobmanager continuing with the processing despite not having the required TMs ? 

Logs to substantiate what I said above (only relevant). 

018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000097 completed successfully with diagnostics: Container released by application


2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e27_1535135887442_1381_01_000102 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 1
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_1381_01_000102 @ hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720

2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000147 - Remaining pending container requests: 0
2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.

2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.
2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state CREATED to RUNNING.
2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF




2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e28_1535135887442_1381_01_000147 has started.
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e28_1535135887442_1381_01_000147 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 2
2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720.

2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000202 - Remaining pending container requests: 0
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000203 - Remaining pending container requests: 0
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Returning excess container container_e28_1535135887442_1381_01_000203

Notice there is no TaskManager container_e28_1535135887442_1381_01_000202 has started. 
I see 
2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state FAILING to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R


2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager




On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <[hidden email]> wrote:
Thanks TIll, 
I do not see any Akka related messages in that Taskmanager after the initial startup. It seemed like all is well. So after the remotewatcher detects it unreachable and the TaskManager unregisters it, I do not see any other activity in the JobManager with regards to reallocation etc. 
- Does the quarantining of the TaskManager not happen until  the exit-on-fatal-akka-error is turned on ? 
- Does the JobManager or the TaskManager try to reconnect to each other again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being unregistered, until the TaskManager exits ? I think it should, especially if it is not trying to establish a connection again. 

I will give the flag a try. 

Sincerely, 


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <[hidden email]> wrote:
Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@....net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@....net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-08-29 23:13:58,529 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. Number of registered task managers 144. Number of available slots 720.

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:14:39,969 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.

2018-08-29 23:14:39,975 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #1


After 5 retries of our Sql query execution graph (we have configured 5 fixed delay restart), it outputs the below, 


2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job ab7e96a1659fbb4bfb3c6cd9bccb0335

2018-08-29 23:15:22,216 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Shutting down

2018-08-29 23:15:22,225 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Removing /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Shutting down.

2018-08-29 23:15:23,460 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  - Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper

2018-08-29 23:15:24,114 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper.

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager




--




--

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Till Rohrmann
Hi,

the quarantining is not really solvable 1.4.x without restarting the Flink component. Thus, I would recommend upgrading to the latest Flink (1.6.0, 1.6.1 will be released later this week) version.

In order to tell what would be the shorter route I would need to know a bit more details about the problems you are facing.

Cheers,
Till

On Mon, Sep 17, 2018 at 9:24 PM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Update on this. We are still weeding past 1.6.0 setup and run. In a separate thread, we are running into issues, and sense more on the horizon before we get it working. 
We are under some tight timelines, so want to ask how confident you are that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2 is the longer route. 

Sincerely, 

On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <[hidden email]> wrote:
Hi Till,
After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@...:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Quick update, I can confirm our issue still happens even with the flag being true. With a proper restart strategy (rate based, that gives it enough time) it can recover from container failures like the first case below, but not able to recover from "detected unreachable" issues like the second case below. 

We are currently using the below configuration. So I guess the only options left are to increase the heartbeat.pause or move to 1.6 as you suggested

akka.client.timeout: 600s
akka.ask.timeout: 600s
akka.lookup.timeout: 600s
akka.watch.heartbeat.pause: 120s

___________________________________________________
8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_e29_1536261974019_1134_01_000036 failed. Exit status: -100
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_e29_1536261974019_1134_01_000036 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from state RUNNING to FAILING.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:49,700 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@hello-world2-13:41157] has failed, address is now gated for [5000] ms. Reason: [Disassociated].
This container 2-13, just has a received
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:47,195 INFO org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
___________________________________________________
But then failed with the same old
018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@hello-world3-3:44607]
<a href="tel:2018091116" target="_blank">2018-09-11 16:42:58,409 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hello-world3-3/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

___________________________________________________

On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

if the container is still running and the TM can simply not connect to the JobManager, then the ResourceManager does not see a problem. The RM things in terms of containers and as long as n containers are running, it won't start new ones. That's the reason why the TM should exit in order to terminate the container.

Have you tried using a newer Flink version? Lately we have reworked a good part of Flink's distributed architecture and added resource elasticity (starting with Flink 1.5).

Cheers,
Till

On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <[hidden email]> wrote:
Hi, 
With some additional research, 

Before the flag
I realized for failed containers (that exited for a specific  we still were Requesting new TM container and launching TM). But for the "Detected unreachable: [akka.tcp://flink@...:123]" issue I do not see the container marked as failed and a subsequent request for TM. 

After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@...:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Our Issue:
I realized we are still exiting the application, i.e. failing when the containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler throws a NoResourceAvailableException that causes a failure. In our case we had fixed restart strategy with 5, and we are running out of it because of this. I am looking to solve this with a FailureRateRestartStrategy over 2 minutes interval (10 second restart delay, >12 failures), that lets the TM come back (takes about 50 seconds). 

Flink Bug
But I cannot help but think why there is no interaction between the ResourceManager and JobManager, i.e. why is the jobmanager continuing with the processing despite not having the required TMs ? 

Logs to substantiate what I said above (only relevant). 

018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000097 completed successfully with diagnostics: Container released by application


2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e27_1535135887442_1381_01_000102 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 1
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:46219/user/taskmanager terminated.
2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_1381_01_000102 @ hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720

2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000147 - Remaining pending container requests: 0
2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.

2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@...:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.
2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state CREATED to RUNNING.
2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF




2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e28_1535135887442_1381_01_000147 has started.
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e28_1535135887442_1381_01_000147 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:41966/user/taskmanager terminated.
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 2
2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720.

2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000202 - Remaining pending container requests: 0
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000203 - Remaining pending container requests: 0
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Returning excess container container_e28_1535135887442_1381_01_000203

Notice there is no TaskManager container_e28_1535135887442_1381_01_000202 has started. 
I see 
2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state FAILING to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R


2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager




On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <[hidden email]> wrote:
Thanks TIll, 
I do not see any Akka related messages in that Taskmanager after the initial startup. It seemed like all is well. So after the remotewatcher detects it unreachable and the TaskManager unregisters it, I do not see any other activity in the JobManager with regards to reallocation etc. 
- Does the quarantining of the TaskManager not happen until  the exit-on-fatal-akka-error is turned on ? 
- Does the JobManager or the TaskManager try to reconnect to each other again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being unregistered, until the TaskManager exits ? I think it should, especially if it is not trying to establish a connection again. 

I will give the flag a try. 

Sincerely, 


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <[hidden email]> wrote:
Could you check whether akka.tcp://flink@...:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@...:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@...:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

Reply | Threaded
Open this post in threaded view
|

Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

Subramanya Suresh
Hi Till, 
Thanks for the information. Please let me know what additional details you would know. 
Lets follow this up on the slack channel (Has Jeff and Konstantin), c-salesforce-poc.

Sincerely, 

On Tue, Sep 18, 2018 at 1:14 AM, Till Rohrmann <[hidden email]> wrote:
Hi,

the quarantining is not really solvable 1.4.x without restarting the Flink component. Thus, I would recommend upgrading to the latest Flink (1.6.0, 1.6.1 will be released later this week) version.

In order to tell what would be the shorter route I would need to know a bit more details about the problems you are facing.

Cheers,
Till

On Mon, Sep 17, 2018 at 9:24 PM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Update on this. We are still weeding past 1.6.0 setup and run. In a separate thread, we are running into issues, and sense more on the horizon before we get it working. 
We are under some tight timelines, so want to ask how confident you are that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2 is the longer route. 

Sincerely, 

On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <[hidden email]> wrote:
Hi Till,
After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Quick update, I can confirm our issue still happens even with the flag being true. With a proper restart strategy (rate based, that gives it enough time) it can recover from container failures like the first case below, but not able to recover from "detected unreachable" issues like the second case below. 

We are currently using the below configuration. So I guess the only options left are to increase the heartbeat.pause or move to 1.6 as you suggested

akka.client.timeout: 600s
akka.ask.timeout: 600s
akka.lookup.timeout: 600s
akka.watch.heartbeat.pause: 120s

___________________________________________________
8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_e29_1536261974019_1134_01_000036 failed. Exit status: -100
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_e29_1536261974019_1134_01_000036 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,862 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:44,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from state RUNNING to FAILING.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:49,700 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@hello-world2-13:41157] has failed, address is now gated for [5000] ms. Reason: [Disassociated].
This container 2-13, just has a received
<a href="tel:2018091116" target="_blank">2018-09-11 16:30:47,195 INFO org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
___________________________________________________
But then failed with the same old
018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@hello-world3-3:44607]
<a href="tel:2018091116" target="_blank">2018-09-11 16:42:58,409 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@hello-world3-3/user/taskmanager terminated.
at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

___________________________________________________

On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

if the container is still running and the TM can simply not connect to the JobManager, then the ResourceManager does not see a problem. The RM things in terms of containers and as long as n containers are running, it won't start new ones. That's the reason why the TM should exit in order to terminate the container.

Have you tried using a newer Flink version? Lately we have reworked a good part of Flink's distributed architecture and added resource elasticity (starting with Flink 1.5).

Cheers,
Till

On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <[hidden email]> wrote:
Hi, 
With some additional research, 

Before the flag
I realized for failed containers (that exited for a specific  we still were Requesting new TM container and launching TM). But for the "Detected unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]" issue I do not see the container marked as failed and a subsequent request for TM. 

After taskmanager.exit-on-fatal-akka-error: true` 
I do not see any unreachable: [akka.tcp://flink@blahabc.sfdc.net:123] since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not sure if that is a coincidence or a direct impact of this change. 

Our Issue:
I realized we are still exiting the application, i.e. failing when the containers are lost. The reason for this is before org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new container TM, launch TM and it is reported as started, the org.apache.flink.runtime.jobmanager.scheduler throws a NoResourceAvailableException that causes a failure. In our case we had fixed restart strategy with 5, and we are running out of it because of this. I am looking to solve this with a FailureRateRestartStrategy over 2 minutes interval (10 second restart delay, >12 failures), that lets the TM come back (takes about 50 seconds). 

Flink Bug
But I cannot help but think why there is no interaction between the ResourceManager and JobManager, i.e. why is the jobmanager continuing with the processing despite not having the required TMs ? 

Logs to substantiate what I said above (only relevant). 

018-09-03 06:17:13,932 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000097 completed successfully with diagnostics: Container released by application


2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
2018-09-03 06:34:19,214 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e27_1535135887442_1381_01_000102 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 1
2018-09-03 06:34:19,215 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 20000 megabytes memory. Pending requests: 1
2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager terminated.
2018-09-03 06:34:19,218 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_1381_01_000102 @ hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
2018-09-03 06:34:19,466 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720

2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000147 - Remaining pending container requests: 0
2018-09-03 06:34:24,717 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956464717: Container: [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.

2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
2018-09-03 06:34:34,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RESTARTING to CREATED.
2018-09-03 06:34:35,044 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state CREATED to RUNNING.
2018-09-03 06:34:35,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF




2018-09-03 06:34:39,248 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e28_1535135887442_1381_01_000147 has started.
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
2018-09-03 06:34:45,235 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_e28_1535135887442_1381_01_000147 in state COMPLETE : exitStatus=-102 diagnostics=Container preempted by scheduler
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager terminated.
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Total number of failed containers so far: 2
2018-09-03 06:34:45,236 INFO  org.apache.flink.runtime.instance.InstanceManager             - Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. Number of registered task managers 144. Number of available slots 720.

2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000202 - Remaining pending container requests: 0
2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1535956485236: Container: [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_e28_1535135887442_1381_01_000203 - Remaining pending container requests: 0
2018-09-03 06:34:45,241 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Returning excess container container_e28_1535135887442_1381_01_000203

Notice there is no TaskManager container_e28_1535135887442_1381_01_000202 has started. 
I see 
2018-09-03 06:34:56,894 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from state FAILING to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R


2018-09-03 06:34:57,005 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
2018-09-03 06:34:57,007 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager




On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <[hidden email]> wrote:
Thanks TIll, 
I do not see any Akka related messages in that Taskmanager after the initial startup. It seemed like all is well. So after the remotewatcher detects it unreachable and the TaskManager unregisters it, I do not see any other activity in the JobManager with regards to reallocation etc. 
- Does the quarantining of the TaskManager not happen until  the exit-on-fatal-akka-error is turned on ? 
- Does the JobManager or the TaskManager try to reconnect to each other again ? Is there a different setting for it ?
- Does the JobManager not reallocate a TaskManager despite it being unregistered, until the TaskManager exits ? I think it should, especially if it is not trying to establish a connection again. 

I will give the flag a try. 

Sincerely, 


On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <[hidden email]> wrote:
Could you check whether akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. tries to reconnect to the JobManager? If this is the case, then the container is still running and the YarnFlinkResourceManager thinks that everything is alright. You can activate that a TaskManager kills itself if it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` in the `flink-conf.yaml`.

Cheers,
Till

On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <[hidden email]> wrote:
Hi Till, 
Greatly appreciate your reply. 
We use version 1.4.2. I do not see nothing unusual in the logs for TM that was lost. Note: I have looked at many such failures and see the same below pattern. 

The JM logs above had most of what I had, but the below is what I have when I search for flink.yarn (we have huge logs otherwise, given the amount of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost and unregistered by JM, operators start failing with NoResourceAvailableException since there was one less TM, 5 retry attempts later job goes down. 

………….

2018-08-29 23:02:41,216 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000124 has started.

2018-08-29 23:02:50,095 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager container_e27_1535135887442_0906_01_000159 has started.

2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager                          - Running initialization on master for job streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).

2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager                          - Successfully ran initialization on master in 0 ms.

2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager                          - Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ hdfs://security-temp/savedSearches/checkpoint.

2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).

2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover all jobs.

2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager                          - There are 1 jobs to recover. Starting the job recovery.

2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager                          - Attempting to recover job ab7e96a1659fbb4bfb3c6cd9bccb0335.

2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager                          - Ignoring job recovery for ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@blahabc.sfdc.net:123/user/taskmanager terminated.

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager                          - Received message for non-existing checkpoint 1

2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager                          - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting down session

2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status FAILED and diagnostics: The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,129 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status FAILED : The monitored job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.

2018-08-29 23:15:26,146 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager

2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager                          - Deleting yarn application files under hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.

2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@blahxyz.sfdc.net:1235/user/jobmanager.

2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager                          - Actor system shut down timed out.

2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.



On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <[hidden email]> wrote:
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink are you running? The configuration yarn.reallocate-failed is deprecated since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the YarnFlinkResourceManager gets notified that a container has failed, it should restart this container (it will do this 145 times). So if the YarnFlinkResourceManager does not get notified about a completed container, then this might indicate that the container is still running. So it would be good to check what the logs of container_e27_1535135887442_0906_01_000039 say.

Moreover, do you see the same problem occur when using the latest release Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <[hidden email]> wrote:

Hi, we are seeing a weird issue where one TaskManager is lost and then never re-allocated and subsequently operators fail with NoResourceAvailableException and after 5 restarts (we have FixedDelay restarts of 5) the application goes down. 

  • We have explicitly set yarn.reallocate-failed: true and have not specified the yarn.maximum-failed-containers (and see “org.apache.flink.yarn.YarnApplicationMasterRunner             - YARN application tolerates 145 failed TaskManager containers before giving up” in the logs). 
  • After the initial startup where all 145 TaskManagers are requested I never see any logs saying “Requesting new TaskManager container” to reallocate failed container. 


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@blahabc.sfdc.net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager                          - Task manager akka.tcp://flink@....net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed: container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 (dataPort=124)

at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)




--