New received containers silently lost during job auto restarts

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

New received containers silently lost during job auto restarts

Paul Lam
Hi,

One of my Flink on YARN jobs got into a weird situation after a fail-fast restart. The restart was triggered by loss of a TaskManager, but when the job was recovering, one of its subtask (1/24) couldn’t be deployed, and finally failed with NoResourceAvailableException.

Looking into the logs, I find the JobManager was requesting containers for the subtask again and again, but indeed it got a container in each round of requests, which seemed to be lost silently (with no error logs) or never registered.

At last, I canceled the job and resubmitted it, and the problem was gone.

I thought it might be something wrong with the TaskManager initialization, but if it is the case there would be some errors in JobManagar’s log, right?

Sorry for not being able to reproduce the problem, but does anyone have any idea on this? Thanks a lot!

This is the cause of the job restart:
```
2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_809133_02_000016 because: Container released on a *lost* node
2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The assigned slot container_1536852167599_809133_02_000016_0 was removed.
        at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
        at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
        at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
        at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
        at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
        at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

And then JobManager kept requesting and returning containers before timeout:
```
2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000832 - Remaining pending container requests: 1
2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
2018-09-27 12:38:11,340 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn254:8041
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000833 - Remaining pending container requests: 0
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000833.
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000834 - Remaining pending container requests: 0
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000834.
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000835 - Remaining pending container requests: 0
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000835.
———————— skip some similar logs ———————
2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002284 - Remaining pending container requests: 1
2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
2018-09-27 12:43:15,727 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn521:8041
2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002285 - Remaining pending container requests: 0
2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002285.
2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002286 - Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002351 - Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002352 - Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002352.
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002353 - Remaining pending container requests: 0
2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002353.
2018-09-27 12:43:16,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
2018-09-27 12:43:21,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
2018-09-27 12:43:21,833 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
2018-09-27 12:43:21,837 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 72, slots allocated: 69
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
        at akka.dispatch.OnComplete.internal(Future.scala:258)
        at akka.dispatch.OnComplete.internal(Future.scala:256)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

Best,
Paul Lam
Reply | Threaded
Open this post in threaded view
|

Re: New received containers silently lost during job auto restarts

Kostas Kloudas
Hi Paul,

I am also cc’ing Till and Gary who may be able to help, but to give them more information,
it would help if you told us which Flink version you are using.

Cheers,
Kostas

> On Sep 27, 2018, at 1:24 PM, Paul Lam <[hidden email]> wrote:
>
> Hi,
>
> One of my Flink on YARN jobs got into a weird situation after a fail-fast restart. The restart was triggered by loss of a TaskManager, but when the job was recovering, one of its subtask (1/24) couldn’t be deployed, and finally failed with NoResourceAvailableException.
>
> Looking into the logs, I find the JobManager was requesting containers for the subtask again and again, but indeed it got a container in each round of requests, which seemed to be lost silently (with no error logs) or never registered.
>
> At last, I canceled the job and resubmitted it, and the problem was gone.
>
> I thought it might be something wrong with the TaskManager initialization, but if it is the case there would be some errors in JobManagar’s log, right?
>
> Sorry for not being able to reproduce the problem, but does anyone have any idea on this? Thanks a lot!
>
> This is the cause of the job restart:
> ```
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_809133_02_000016 because: Container released on a *lost* node
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: The assigned slot container_1536852167599_809133_02_000016_0 was removed.
> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
> at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
>
> And then JobManager kept requesting and returning containers before timeout:
> ```
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000832 - Remaining pending container requests: 1
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
> 2018-09-27 12:38:11,340 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn254:8041
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000833 - Remaining pending container requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000833.
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000834 - Remaining pending container requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000834.
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000835 - Remaining pending container requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000835.
> ———————— skip some similar logs ———————
> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002284 - Remaining pending container requests: 1
> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
> 2018-09-27 12:43:15,727 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn521:8041
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002285 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002285.
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002286 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002351 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002352 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002352.
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002353 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002353.
> 2018-09-27 12:43:16,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
> 2018-09-27 12:43:21,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
> 2018-09-27 12:43:21,833 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
> 2018-09-27 12:43:21,837 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 72, slots allocated: 69
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
> at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
>
> Best,
> Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: New received containers silently lost during job auto restarts

Paul Lam
Hi Kostas,

Sorry, I forget that. I'm using Flink 1.5.3. 

Best,
Paul Lam

Kostas Kloudas <[hidden email]> 于2018年9月27日周四 下午8:22写道:
Hi Paul,

I am also cc’ing Till and Gary who may be able to help, but to give them more information,
it would help if you told us which Flink version you are using.

Cheers,
Kostas

> On Sep 27, 2018, at 1:24 PM, Paul Lam <[hidden email]> wrote:
>
> Hi,
>
> One of my Flink on YARN jobs got into a weird situation after a fail-fast restart. The restart was triggered by loss of a TaskManager, but when the job was recovering, one of its subtask (1/24) couldn’t be deployed, and finally failed with NoResourceAvailableException.
>
> Looking into the logs, I find the JobManager was requesting containers for the subtask again and again, but indeed it got a container in each round of requests, which seemed to be lost silently (with no error logs) or never registered.
>
> At last, I canceled the job and resubmitted it, and the problem was gone.
>
> I thought it might be something wrong with the TaskManager initialization, but if it is the case there would be some errors in JobManagar’s log, right?
>
> Sorry for not being able to reproduce the problem, but does anyone have any idea on this? Thanks a lot!
>
> This is the cause of the job restart:
> ```
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_809133_02_000016 because: Container released on a *lost* node
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: The assigned slot container_1536852167599_809133_02_000016_0 was removed.
>       at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
>       at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
>       at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
>       at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
>       at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
>       at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
>       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>       at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
>
> And then JobManager kept requesting and returning containers before timeout:
> ```
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000832 - Remaining pending container requests: 1
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
> 2018-09-27 12:38:11,340 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn254:8041
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000833 - Remaining pending container requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000833.
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000834 - Remaining pending container requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000834.
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000835 - Remaining pending container requests: 0
> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000835.
> ———————— skip some similar logs ———————
> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002284 - Remaining pending container requests: 1
> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
> 2018-09-27 12:43:15,727 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn521:8041
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002285 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002285.
> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002286 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002351 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002352 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002352.
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002353 - Remaining pending container requests: 0
> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002353.
> 2018-09-27 12:43:16,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
> 2018-09-27 12:43:21,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
> 2018-09-27 12:43:21,833 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
> 2018-09-27 12:43:21,837 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 72, slots allocated: 69
>       at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
>       at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
>       at akka.dispatch.OnComplete.internal(Future.scala:258)
>       at akka.dispatch.OnComplete.internal(Future.scala:256)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>       at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>       at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>       at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>       at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>       at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>       at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>       at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>       at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
>
> Best,
> Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: New received containers silently lost during job auto restarts

Paul Lam
In reply to this post by Kostas Kloudas
Hi everyone,

Today I get this error again in another job, and I find some logs indicating it’s probably related to the delegation token.

```
2018-09-28 09:49:09,668 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_1262075_03_000010 because: token (HDFS_DELEGATION_TOKEN token 3647547 for gdc_sa) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 3647547 for gdc_sa) can't be found in cache
        at org.apache.hadoop.ipc.Client.call(Client.java:1466)
        at org.apache.hadoop.ipc.Client.call(Client.java:1403)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:757)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2102)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1214)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
        at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
        at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
```

In addition, the failed subtask is always the first one (subtask 1). I guess that’s because it’s the first one to run and the first one to meet this problem.
But as I’m submitting the job wit my keytab, it should not be bothered by the delegation token.

Now I have three concerns:
1. Why missing delegation tokens in cache would make a subtask fail?
2. Why it can’t be recovered after a job auto restart? The subtask gets stuck in scheduled status and seems like never deployed.
3. What would happen if a subtask can’t be deployed on the new received container? Would JobManager grab more and more containers and consider the situation just normal before NoResourceAvailableException?

I will keep working on it because it’s critical to me, and any help is highly appreciated! Thank you!

Best,
Paul Lam


> 在 2018年9月27日,20:22,Kostas Kloudas <[hidden email]> 写道:
>
> Hi Paul,
>
> I am also cc’ing Till and Gary who may be able to help, but to give them more information,
> it would help if you told us which Flink version you are using.
>
> Cheers,
> Kostas
>
>> On Sep 27, 2018, at 1:24 PM, Paul Lam <[hidden email]> wrote:
>>
>> Hi,
>>
>> One of my Flink on YARN jobs got into a weird situation after a fail-fast restart. The restart was triggered by loss of a TaskManager, but when the job was recovering, one of its subtask (1/24) couldn’t be deployed, and finally failed with NoResourceAvailableException.
>>
>> Looking into the logs, I find the JobManager was requesting containers for the subtask again and again, but indeed it got a container in each round of requests, which seemed to be lost silently (with no error logs) or never registered.
>>
>> At last, I canceled the job and resubmitted it, and the problem was gone.
>>
>> I thought it might be something wrong with the TaskManager initialization, but if it is the case there would be some errors in JobManagar’s log, right?
>>
>> Sorry for not being able to reproduce the problem, but does anyone have any idea on this? Thanks a lot!
>>
>> This is the cause of the job restart:
>> ```
>> 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_809133_02_000016 because: Container released on a *lost* node
>> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
>> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkException: The assigned slot container_1536852167599_809133_02_000016_0 was removed.
>> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
>> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
>> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
>> at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
>> at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
>> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ```
>>
>> And then JobManager kept requesting and returning containers before timeout:
>> ```
>> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000832 - Remaining pending container requests: 1
>> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
>> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
>> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
>> 2018-09-27 12:38:11,340 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn254:8041
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000833 - Remaining pending container requests: 0
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000833.
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000834 - Remaining pending container requests: 0
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000834.
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000835 - Remaining pending container requests: 0
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000835.
>> ———————— skip some similar logs ———————
>> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002284 - Remaining pending container requests: 1
>> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
>> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
>> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
>> 2018-09-27 12:43:15,727 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn521:8041
>> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002285 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002285.
>> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002286 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002351 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002352 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002352.
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002353 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002353.
>> 2018-09-27 12:43:16,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
>> 2018-09-27 12:43:21,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
>> 2018-09-27 12:43:21,833 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
>> 2018-09-27 12:43:21,837 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 72, slots allocated: 69
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
>> at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ```
>>
>> Best,
>> Paul Lam
>

Reply | Threaded
Open this post in threaded view
|

Re: New received containers silently lost during job auto restarts

Till Rohrmann
Hi Paul,

this looks to me like a Yarn setup problem. Could you check which value you have set for dfs.namenode.delegation.token.max-lifetime? Per default it is set to 7 days.

The reason why Flink needs to access HDFS is because the binaries and the configuration files are stored there for an Yarn application. Thus, you also need to have read access. Otherwise Flink cannot start new TaskManagers.

If starting of a TaskManager does not succeed, then Flink will try to allocate a new container and try to start another TaskManager. This continues until the slot request timeout is triggered causing the NoResourceAvailableException to be thrown.

Cheers,
Till

On Fri, Sep 28, 2018 at 5:23 AM Paul Lam <[hidden email]> wrote:
Hi everyone,

Today I get this error again in another job, and I find some logs indicating it’s probably related to the delegation token.

```
2018-09-28 09:49:09,668 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_1262075_03_000010 because: token (HDFS_DELEGATION_TOKEN token 3647547 for gdc_sa) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 3647547 for gdc_sa) can't be found in cache
        at org.apache.hadoop.ipc.Client.call(Client.java:1466)
        at org.apache.hadoop.ipc.Client.call(Client.java:1403)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:757)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2102)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1214)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
        at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
        at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
```

In addition, the failed subtask is always the first one (subtask 1). I guess that’s because it’s the first one to run and the first one to meet this problem.
But as I’m submitting the job wit my keytab, it should not be bothered by the delegation token.

Now I have three concerns:
1. Why missing delegation tokens in cache would make a subtask fail?
2. Why it can’t be recovered after a job auto restart? The subtask gets stuck in scheduled status and seems like never deployed.
3. What would happen if a subtask can’t be deployed on the new received container? Would JobManager grab more and more containers and consider the situation just normal before NoResourceAvailableException?

I will keep working on it because it’s critical to me, and any help is highly appreciated! Thank you!

Best,
Paul Lam


> 在 2018年9月27日,20:22,Kostas Kloudas <[hidden email]> 写道:
>
> Hi Paul,
>
> I am also cc’ing Till and Gary who may be able to help, but to give them more information,
> it would help if you told us which Flink version you are using.
>
> Cheers,
> Kostas
>
>> On Sep 27, 2018, at 1:24 PM, Paul Lam <[hidden email]> wrote:
>>
>> Hi,
>>
>> One of my Flink on YARN jobs got into a weird situation after a fail-fast restart. The restart was triggered by loss of a TaskManager, but when the job was recovering, one of its subtask (1/24) couldn’t be deployed, and finally failed with NoResourceAvailableException.
>>
>> Looking into the logs, I find the JobManager was requesting containers for the subtask again and again, but indeed it got a container in each round of requests, which seemed to be lost silently (with no error logs) or never registered.
>>
>> At last, I canceled the job and resubmitted it, and the problem was gone.
>>
>> I thought it might be something wrong with the TaskManager initialization, but if it is the case there would be some errors in JobManagar’s log, right?
>>
>> Sorry for not being able to reproduce the problem, but does anyone have any idea on this? Thanks a lot!
>>
>> This is the cause of the job restart:
>> ```
>> 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_809133_02_000016 because: Container released on a *lost* node
>> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
>> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkException: The assigned slot container_1536852167599_809133_02_000016_0 was removed.
>>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
>>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
>>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
>>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
>>      at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
>>      at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
>>      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>      at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>      at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>      at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>      at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>      at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>      at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>      at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ```
>>
>> And then JobManager kept requesting and returning containers before timeout:
>> ```
>> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000832 - Remaining pending container requests: 1
>> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
>> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
>> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
>> 2018-09-27 12:38:11,340 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn254:8041
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000833 - Remaining pending container requests: 0
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000833.
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000834 - Remaining pending container requests: 0
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000834.
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000835 - Remaining pending container requests: 0
>> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000835.
>> ———————— skip some similar logs ———————
>> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002284 - Remaining pending container requests: 1
>> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
>> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
>> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
>> 2018-09-27 12:43:15,727 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn521:8041
>> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002285 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002285.
>> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002286 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002351 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002352 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002352.
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002353 - Remaining pending container requests: 0
>> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002353.
>> 2018-09-27 12:43:16,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
>> 2018-09-27 12:43:21,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
>> 2018-09-27 12:43:21,833 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
>> 2018-09-27 12:43:21,837 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 72, slots allocated: 69
>>      at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
>>      at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>      at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>      at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>      at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>      at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
>>      at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>      at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>      at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>      at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>      at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
>>      at akka.dispatch.OnComplete.internal(Future.scala:258)
>>      at akka.dispatch.OnComplete.internal(Future.scala:256)
>>      at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>      at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>      at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>      at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>      at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>      at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>      at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>      at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>      at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>      at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>      at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>      at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>      at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>      at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>      at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>      at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>      at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>      at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>      at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ```
>>
>> Best,
>> Paul Lam
>

Reply | Threaded
Open this post in threaded view
|

Re: New received containers silently lost during job auto restarts

Paul Lam
Hi Till,

Thanks for your reply!

We considered the expiration of delegation tokens, but as we submit the jobs with keytabs and some jobs have been running for weeks, it seems that it’s not the cause.

And the cause is probably the read access of HDFS that you mentioned (Hadoop 2.6.5-cdh-5.6.0 with kerberos). I find the newly received containers failed with final state  `LOCALIZATION_FAILED` caused by not being able to authenticate. The TaskManager was never started, so it’s reasonable that JobManager didn’t report any errors.

But still I cannot tell whether it’s a problem of YARN or Flink, because NodeManager’s log shows that no credential was provided in the container launch context, while the token should be set judging from JobManager’s log, otherwise there would be some other error or info logs.

Now I suspect it's related to [HDFS-9276](https://issues.apache.org/jira/browse/HDFS-9276), but not sure yet.

Thank you very much!

NodeManager’s log:
```
java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "dn573/10.191.58.209"; destination host is: "nn02":9000;
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
    at org.apache.hadoop.ipc.Client.call(Client.java:1470)
    at org.apache.hadoop.ipc.Client.call(Client.java:1403)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
    at com.sun.proxy.$Proxy22.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:757)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
    at com.sun.proxy.$Proxy23.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2102)
    at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1214)
    at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:424)
    at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.getFileStatus(ChRootedFileSystem.java:224)
    at org.apache.hadoop.fs.viewfs.ViewFileSystem.getFileStatus(ViewFileSystem.java:359)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.checkExists(LogAggregationService.java:248)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.access$100(LogAggregationService.java:67)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService$1.run(LogAggregationService.java:276)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.createAppDir(LogAggregationService.java:261)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initAppAggregator(LogAggregationService.java:366)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initApp(LogAggregationService.java:320)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.handle(LogAggregationService.java:443)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.handle(LogAggregationService.java:67)
    at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:174)
    at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
    at java.lang.Thread.run(Thread.java:745)
```

Best,
Paul Lam



> 在 2018年9月28日,20:32,Till Rohrmann <[hidden email]> 写道:
>
> Hi Paul,
>
> this looks to me like a Yarn setup problem. Could you check which value you have set for dfs.namenode.delegation.token.max-lifetime? Per default it is set to 7 days.
>
> The reason why Flink needs to access HDFS is because the binaries and the configuration files are stored there for an Yarn application. Thus, you also need to have read access. Otherwise Flink cannot start new TaskManagers.
>
> If starting of a TaskManager does not succeed, then Flink will try to allocate a new container and try to start another TaskManager. This continues until the slot request timeout is triggered causing the NoResourceAvailableException to be thrown.
>
> Cheers,
> Till
>
> On Fri, Sep 28, 2018 at 5:23 AM Paul Lam <[hidden email]> wrote:
> Hi everyone,
>
> Today I get this error again in another job, and I find some logs indicating it’s probably related to the delegation token.
>
> ```
> 2018-09-28 09:49:09,668 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_1262075_03_000010 because: token (HDFS_DELEGATION_TOKEN token 3647547 for gdc_sa) can't be found in cache
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 3647547 for gdc_sa) can't be found in cache
>         at org.apache.hadoop.ipc.Client.call(Client.java:1466)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1403)
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>         at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:757)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
>         at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
>         at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2102)
>         at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1214)
>         at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
>         at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
>         at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
>         at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
>         at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
>         at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
>         at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
>         at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> ```
>
> In addition, the failed subtask is always the first one (subtask 1). I guess that’s because it’s the first one to run and the first one to meet this problem.
> But as I’m submitting the job wit my keytab, it should not be bothered by the delegation token.
>
> Now I have three concerns:
> 1. Why missing delegation tokens in cache would make a subtask fail?
> 2. Why it can’t be recovered after a job auto restart? The subtask gets stuck in scheduled status and seems like never deployed.
> 3. What would happen if a subtask can’t be deployed on the new received container? Would JobManager grab more and more containers and consider the situation just normal before NoResourceAvailableException?
>
> I will keep working on it because it’s critical to me, and any help is highly appreciated! Thank you!
>
> Best,
> Paul Lam
>
>
> > 在 2018年9月27日,20:22,Kostas Kloudas <[hidden email]> 写道:
> >
> > Hi Paul,
> >
> > I am also cc’ing Till and Gary who may be able to help, but to give them more information,
> > it would help if you told us which Flink version you are using.
> >
> > Cheers,
> > Kostas
> >
> >> On Sep 27, 2018, at 1:24 PM, Paul Lam <[hidden email]> wrote:
> >>
> >> Hi,
> >>
> >> One of my Flink on YARN jobs got into a weird situation after a fail-fast restart. The restart was triggered by loss of a TaskManager, but when the job was recovering, one of its subtask (1/24) couldn’t be deployed, and finally failed with NoResourceAvailableException.
> >>
> >> Looking into the logs, I find the JobManager was requesting containers for the subtask again and again, but indeed it got a container in each round of requests, which seemed to be lost silently (with no error logs) or never registered.
> >>
> >> At last, I canceled the job and resubmitted it, and the problem was gone.
> >>
> >> I thought it might be something wrong with the TaskManager initialization, but if it is the case there would be some errors in JobManagar’s log, right?
> >>
> >> Sorry for not being able to reproduce the problem, but does anyone have any idea on this? Thanks a lot!
> >>
> >> This is the cause of the job restart:
> >> ```
> >> 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1536852167599_809133_02_000016 because: Container released on a *lost* node
> >> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
> >> 2018-09-27 12:33:11,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING to FAILED.
> >> org.apache.flink.util.FlinkException: The assigned slot container_1536852167599_809133_02_000016_0 was removed.
> >>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
> >>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
> >>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
> >>      at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
> >>      at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
> >>      at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
> >>      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> >>      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> >>      at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> >>      at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> >>      at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> >>      at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> >>      at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >>      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >>      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >>      at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >>      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >>      at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >>      at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >>      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> ```
> >>
> >> And then JobManager kept requesting and returning containers before timeout:
> >> ```
> >> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000832 - Remaining pending container requests: 1
> >> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
> >> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
> >> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
> >> 2018-09-27 12:38:11,340 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn254:8041
> >> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000833 - Remaining pending container requests: 0
> >> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000833.
> >> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000834 - Remaining pending container requests: 0
> >> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000834.
> >> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_000835 - Remaining pending container requests: 0
> >> 2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_000835.
> >> ———————— skip some similar logs ———————
> >> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002284 - Remaining pending container requests: 1
> >> 2018-09-27 12:43:15,449 INFO  org.apache.flink.yarn.YarnResourceManager                     - Adding keytab hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab to the AM container local resource bucket
> >> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
> >> 2018-09-27 12:43:15,725 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers
> >> 2018-09-27 12:43:15,727 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : gdc-dn521:8041
> >> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002285 - Remaining pending container requests: 0
> >> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002285.
> >> 2018-09-27 12:43:15,733 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002286 - Remaining pending container requests: 0
> >> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002351 - Remaining pending container requests: 0
> >> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002352 - Remaining pending container requests: 0
> >> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002352.
> >> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1536852167599_809133_02_002353 - Remaining pending container requests: 0
> >> 2018-09-27 12:43:15,734 INFO  org.apache.flink.yarn.YarnResourceManager                     - Returning excess container container_1536852167599_809133_02_002353.
> >> 2018-09-27 12:43:16,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
> >> 2018-09-27 12:43:21,831 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (1/24) of job 4f6c0c7a0c09254ff6c1de9bc92dc9e7 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
> >> 2018-09-27 12:43:21,833 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Pending slot request [SlotRequestId{7c611bb7a1cb28c061ada9c485b116ac}] timed out.
> >> 2018-09-27 12:43:21,837 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job LogStream (4f6c0c7a0c09254ff6c1de9bc92dc9e7) switched from state RUNNING to FAILING.
> >> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 72, slots allocated: 69
> >>      at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
> >>      at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >>      at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >>      at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >>      at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >>      at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
> >>      at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> >>      at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> >>      at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >>      at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >>      at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
> >>      at akka.dispatch.OnComplete.internal(Future.scala:258)
> >>      at akka.dispatch.OnComplete.internal(Future.scala:256)
> >>      at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> >>      at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> >>      at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >>      at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> >>      at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> >>      at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> >>      at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> >>      at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
> >>      at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
> >>      at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> >>      at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> >>      at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >>      at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> >>      at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> >>      at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> >>      at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> >>      at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> >>      at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> >>      at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> >>      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> >>      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> ```
> >>
> >> Best,
> >> Paul Lam
> >
>