Hello,
I'm currently trying to check whether my job is restarted in case of Job Manager failure. The job is submitted as a single job on YARN with the following options set in the flink-conf.yaml:
Then I kill the Job Manager container. After that YARN starts a new Job Manager container but the job is not started. What am I doing wrong? Do I need something else to be configured to enable job restarts on JM failure?
I'm using flink 1.3 Hadoop 2.6
Thanks in advance. Kind Regards, Mike Pryakhin smime.p7s (2K) Download Attachment |
Hi Mike,
have you configured zookeeper [1] ? afaik, it is required for a high- availability (YARN) session and is used to store JobManager state. Without it, a recovery would not know what to recover from. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ jobmanager_high_availability.html#yarn-cluster-high-availability On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote: > Hello, > > I'm currently trying to check whether my job is restarted in case of Job > Manager failure. The job is submitted as a single job on YARN with the > following options set in the flink-conf.yaml: > > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 3 > restart-strategy.fixed-delay.delay: 10 s > > Then I kill the Job Manager container. After that YARN starts a new Job > Manager container but the job is not started. What am I doing wrong? Do I > need something else to be configured to enable job restarts on JM failure? > > I'm using flink 1.3 Hadoop 2.6 > > Thanks in advance. > > Kind Regards, > Mike Pryakhin signature.asc (201 bytes) Download Attachment |
Hi Niko, Thanks for your reply! Having zookeeper-related properties configured everything works smoothly! I was confused because the doc references high availability configuration for yarn session mode, anyway thanks a lot! Now I noticed another problem, when I kill the job manager, it then gets restarted by YARN, and the following stacktrace appears in the Job Manager log: 2017-06-20 16:44:53,843 WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointWriter.postStop(Endpoint.scala:583) at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-06-20 16:44:53,932 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Received new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Retrieved 1 TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) 2017-06-20 16:45:03,971 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 16:45:03,975 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) 2017-06-20 16:45:13,990 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 16:45:13,993 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - Caught exception akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointWriter.postStop(Endpoint.scala:583) at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-06-20 16:45:24,010 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) 2017-06-20 16:45:24,010 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 16:45:24,014 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - Caught exception akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointWriter.postStop(Endpoint.scala:583) at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-06-20 16:45:34,029 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) 2017-06-20 16:45:34,030 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager Finally, Job manager starts and works properly, just wanted to understand the cause of the error. Kind Regards, Mike Pryakhin
smime.p7s (2K) Download Attachment |
My best guess here is that the resource manager is still trying to connect to
the JobManager which failed - after all, how should it know whether this is a temporary network failure or a permanent failure?! If, after your new JobManager starts, the errors stop as well, I'd say, you don't have to worry about the messages. Till (cc'd) may elaborate a bit more on this. Nico On Tuesday, 20 June 2017 17:06:00 CEST Mikhail Pryakhin wrote: > Hi Niko, > Thanks for your reply! > > Having zookeeper-related properties configured everything works smoothly! > I was confused because the doc references high availability configuration > for yarn session mode, anyway thanks a lot! > > Now I noticed another problem, when I kill the job manager, it then gets > restarted by YARN, and the following stacktrace appears in the Job Manager > log: > > 2017-06-20 16:44:53,843 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found > for: ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > Path(/user/jobmanager)] at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala: > 65) at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala: > 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor. > scala:55) at > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut > e(Future.scala:74) at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future. > scala:73) at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct > orRefProvider.scala:87) at > akka.remote.EndpointWriter.postStop(Endpoint.scala:583) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi > nishTerminate(FaultHandling.scala:210) at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi > spatcher.scala:397) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: > 1339) at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav > a:107) 2017-06-20 16:44:53,932 INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Received > new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Retrieved 1 > TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Trying to > associate with JobManager leader > akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > 16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > Path(/user/jobmanager)]] after [10000 ms] at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s > cala:599) at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca > la:474) at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc > ala:425) at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429 > ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-06-20 16:45:03,971 INFO org.apache.flink.yarn.YarnFlinkResourceManager > - Trying to associate with JobManager leader > akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > 16:45:03,975 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated > for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: > dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR > org.apache.flink.yarn.YarnFlinkResourceManager - Resource > manager could not register at JobManager akka.pattern.AskTimeoutException: > Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > Path(/user/jobmanager)]] after [10000 ms] at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s > cala:599) at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca > la:474) at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc > ala:425) at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429 > ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-06-20 16:45:13,990 INFO org.apache.flink.yarn.YarnFlinkResourceManager > - Trying to associate with JobManager leader > akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > 16:45:13,993 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated > for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: > dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR > org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - Caught > exception akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > Path(/user/jobmanager)] at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala: > 65) at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala: > 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor. > scala:55) at > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut > e(Future.scala:74) at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future. > scala:73) at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct > orRefProvider.scala:87) at > akka.remote.EndpointWriter.postStop(Endpoint.scala:583) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi > nishTerminate(FaultHandling.scala:210) at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi > spatcher.scala:397) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: > 1339) at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav > a:107) 2017-06-20 16:45:24,010 ERROR > org.apache.flink.yarn.YarnFlinkResourceManager - Resource > manager could not register at JobManager akka.pattern.AskTimeoutException: > Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > Path(/user/jobmanager)]] after [10000 ms] at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s > cala:599) at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca > la:474) at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc > ala:425) at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429 > ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-06-20 16:45:24,010 INFO org.apache.flink.yarn.YarnFlinkResourceManager > - Trying to associate with JobManager leader > akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > 16:45:24,014 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated > for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: > dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR > org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - Caught > exception akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > Path(/user/jobmanager)] at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala: > 65) at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala: > 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor. > scala:55) at > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut > e(Future.scala:74) at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future. > scala:73) at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct > orRefProvider.scala:87) at > akka.remote.EndpointWriter.postStop(Endpoint.scala:583) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi > nishTerminate(FaultHandling.scala:210) at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi > spatcher.scala:397) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: > 1339) at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav > a:107) 2017-06-20 16:45:34,029 ERROR > org.apache.flink.yarn.YarnFlinkResourceManager - Resource > manager could not register at JobManager akka.pattern.AskTimeoutException: > Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > Path(/user/jobmanager)]] after [10000 ms] at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s > cala:599) at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca > la:474) at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc > ala:425) at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429 > ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-06-20 16:45:34,030 INFO org.apache.flink.yarn.YarnFlinkResourceManager > - Trying to associate with JobManager leader > akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager > > > Finally, Job manager starts and works properly, just wanted to understand > the cause of the error. > > > Mant thanks in advance! > > > Kind Regards, > Mike Pryakhin > > > On 20 Jun 2017, at 17:34, Nico Kruber <[hidden email]> wrote: > > > > Hi Mike, > > have you configured zookeeper [1] ? afaik, it is required for a high- > > availability (YARN) session and is used to store JobManager state. Without > > it, a recovery would not know what to recover from. > > > > > > Nico > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > > jobmanager_high_availability.html#yarn-cluster-high-availability > > > > On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote: > >> Hello, > >> > >> I'm currently trying to check whether my job is restarted in case of Job > >> Manager failure. The job is submitted as a single job on YARN with the > >> following options set in the flink-conf.yaml: > >> > >> restart-strategy: fixed-delay > >> restart-strategy.fixed-delay.attempts: 3 > >> restart-strategy.fixed-delay.delay: 10 s > >> > >> Then I kill the Job Manager container. After that YARN starts a new Job > >> Manager container but the job is not started. What am I doing wrong? Do I > >> need something else to be configured to enable job restarts on JM > >> failure? > >> > >> I'm using flink 1.3 Hadoop 2.6 > >> > >> Thanks in advance. > >> > >> Kind Regards, > >> Mike Pryakhin signature.asc (201 bytes) Download Attachment |
Thanks a lot Niko!
It is definitely a permanent failure, I’ve tried multiple times and always got the same behaviour. And as you said, right after the job manager has started the errors stop as well. I also checked the logs of the Task Manager and found huge amount of the following warnings (): 2017-06-20 18:38:45,087 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:50,105 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:55,125 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:30086] the warnings above finally finish with the error message 2017-06-20 18:41:30,744 ERROR Remoting - Association to [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] with UID [77777149] irrecoverably failed. Quarantining address. java.util.concurrent.TimeoutException: Delivery of system messages timed out and they were dropped. at akka.remote.ReliableDeliverySupervisor$$anonfun$gated$1.applyOrElse(Endpoint.scala:336) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:189) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) After that multiple Info lines follow. 2017-06-20 18:47:40,453 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483480 dead. Kind Regards, Mike Pryakhin
smime.p7s (2K) Download Attachment |
yes, this supports my theory: both the ResourceManager and the TaskManager
instances are trying to reconnect to the old JobManager to re-establish a link in case of temporary failures. * In the best case, the connection is re-established and everything continues. * Since your failure is permanent, this is not the case and after some retries, Flink also thinks that ;) We can't really avoid the warnings though, even in a HA setup, since the administrator might want to know what is happening and what Flink is doing under the hood. Everything's fine then. Nico On Tuesday, 20 June 2017 17:56:00 CEST Mikhail Pryakhin wrote: > Thanks a lot Niko! > > It is definitely a permanent failure, I’ve tried multiple times and always > got the same behaviour. And as you said, right after the job manager has > started the errors stop as well. > > I also checked the logs of the Task Manager and found huge amount of the > following warnings (): > > 2017-06-20 18:38:45,087 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated > for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: > dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:50,105 WARN > akka.remote.ReliableDeliverySupervisor - Association > with remote system [akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, > address is now gated for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: > dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:55,125 WARN > akka.remote.ReliableDeliverySupervisor - Association > with remote system [akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, > address is now gated for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: > dmpkit-dev-dn2/136.243.170.205:30086] > > the warnings above finally finish with the error message > > 2017-06-20 18:41:30,744 ERROR Remoting > - Association to [akka.tcp://flink@dmpkit-dev-dn2:30086] > with UID [77777149] irrecoverably failed. Quarantining address. > java.util.concurrent.TimeoutException: Delivery of system messages timed > out and they were dropped. at > akka.remote.ReliableDeliverySupervisor$$anonfun$gated$1.applyOrElse(Endpoin > t.scala:336) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala: > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi > spatcher.scala:397) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: > 1339) at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav > a:107) > > After that multiple Info lines follow. > > 2017-06-20 18:47:40,453 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator 2147483480 dead. > > > > > > Kind Regards, > Mike Pryakhin > > > On 20 Jun 2017, at 18:27, Nico Kruber <[hidden email]> wrote: > > > > My best guess here is that the resource manager is still trying to connect > > to the JobManager which failed - after all, how should it know whether > > this is a temporary network failure or a permanent failure?! > > > > If, after your new JobManager starts, the errors stop as well, I'd say, > > you > > don't have to worry about the messages. > > Till (cc'd) may elaborate a bit more on this. > > > > > > Nico > > > > On Tuesday, 20 June 2017 17:06:00 CEST Mikhail Pryakhin wrote: > >> Hi Niko, > >> Thanks for your reply! > >> > >> Having zookeeper-related properties configured everything works smoothly! > >> I was confused because the doc references high availability configuration > >> for yarn session mode, anyway thanks a lot! > >> > >> Now I noticed another problem, when I kill the job manager, it then gets > >> restarted by YARN, and the following stacktrace appears in the Job > >> Manager > >> log: > >> > >> 2017-06-20 16:44:53,843 WARN > >> org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > >> retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not > >> found > >> for: ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > >> Path(/user/jobmanager)] at > >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal > >> a: > >> 65) at > >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal > >> a: > >> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > >> > >> at > >> > >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto > >> r. > >> scala:55) at > >> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at > >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec > >> ut > >> e(Future.scala:74) at > >> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) > >> at > >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur > >> e. > >> scala:73) at > >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > >> at > >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24 > >> 8) > >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) > >> > >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) > >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) > >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) > >> at > >> > >> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA > >> ct > >> orRefProvider.scala:87) at > >> akka.remote.EndpointWriter.postStop(Endpoint.scala:583) > >> > >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) > >> at > >> > >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$ > >> fi > >> nishTerminate(FaultHandling.scala:210) at > >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) > >> > >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >> at > >> > >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract > >> Di > >> spatcher.scala:397) at > >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav > >> a: > >> 1339) at > >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> at > >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j > >> av > >> a:107) 2017-06-20 16:44:53,932 INFO > >> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Received > >> new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO > >> org.apache.flink.yarn.YarnFlinkResourceManager - Retrieved > >> 1 > >> TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO > >> org.apache.flink.yarn.YarnFlinkResourceManager - Trying to > >> associate with JobManager leader > >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > >> 16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > >> > >> - Resource manager could not register at JobManager > >> > >> akka.pattern.AskTimeoutException: Ask timed out on > >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > >> Path(/user/jobmanager)]] after [10000 ms] at > >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334 > >> ) > >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > >> > >> at > >> > >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future > >> .s > >> cala:599) at > >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10 > >> 9) > >> at > >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59 > >> 7) > >> at > >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s > >> ca > >> la:474) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler. > >> sc > >> ala:425) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4 > >> 29 > >> ) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > >> at java.lang.Thread.run(Thread.java:745) > >> 2017-06-20 16:45:03,971 INFO > >> org.apache.flink.yarn.YarnFlinkResourceManager>> > >> - Trying to associate with JobManager leader > >> > >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > >> 16:45:03,975 WARN akka.remote.ReliableDeliverySupervisor > >> > >> - Association with remote system > >> > >> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated > >> for [5000] ms. Reason: [Association failed with > >> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: > >> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR > >> org.apache.flink.yarn.YarnFlinkResourceManager - Resource > >> manager could not register at JobManager > >> akka.pattern.AskTimeoutException: > >> Ask timed out on > >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > >> Path(/user/jobmanager)]] after [10000 ms] at > >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334 > >> ) > >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > >> > >> at > >> > >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future > >> .s > >> cala:599) at > >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10 > >> 9) > >> at > >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59 > >> 7) > >> at > >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s > >> ca > >> la:474) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler. > >> sc > >> ala:425) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4 > >> 29 > >> ) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > >> at java.lang.Thread.run(Thread.java:745) > >> 2017-06-20 16:45:13,990 INFO > >> org.apache.flink.yarn.YarnFlinkResourceManager>> > >> - Trying to associate with JobManager leader > >> > >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > >> 16:45:13,993 WARN akka.remote.ReliableDeliverySupervisor > >> > >> - Association with remote system > >> > >> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated > >> for [5000] ms. Reason: [Association failed with > >> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: > >> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR > >> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - > >> Caught > >> exception akka.actor.ActorNotFound: Actor not found for: > >> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > >> Path(/user/jobmanager)] at > >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal > >> a: > >> 65) at > >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal > >> a: > >> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > >> > >> at > >> > >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto > >> r. > >> scala:55) at > >> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at > >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec > >> ut > >> e(Future.scala:74) at > >> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) > >> at > >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur > >> e. > >> scala:73) at > >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > >> at > >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24 > >> 8) > >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) > >> > >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) > >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) > >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) > >> at > >> > >> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA > >> ct > >> orRefProvider.scala:87) at > >> akka.remote.EndpointWriter.postStop(Endpoint.scala:583) > >> > >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) > >> at > >> > >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$ > >> fi > >> nishTerminate(FaultHandling.scala:210) at > >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) > >> > >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >> at > >> > >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract > >> Di > >> spatcher.scala:397) at > >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav > >> a: > >> 1339) at > >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> at > >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j > >> av > >> a:107) 2017-06-20 16:45:24,010 ERROR > >> org.apache.flink.yarn.YarnFlinkResourceManager - Resource > >> manager could not register at JobManager > >> akka.pattern.AskTimeoutException: > >> Ask timed out on > >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > >> Path(/user/jobmanager)]] after [10000 ms] at > >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334 > >> ) > >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > >> > >> at > >> > >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future > >> .s > >> cala:599) at > >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10 > >> 9) > >> at > >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59 > >> 7) > >> at > >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s > >> ca > >> la:474) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler. > >> sc > >> ala:425) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4 > >> 29 > >> ) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > >> at java.lang.Thread.run(Thread.java:745) > >> 2017-06-20 16:45:24,010 INFO > >> org.apache.flink.yarn.YarnFlinkResourceManager>> > >> - Trying to associate with JobManager leader > >> > >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20 > >> 16:45:24,014 WARN akka.remote.ReliableDeliverySupervisor > >> > >> - Association with remote system > >> > >> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated > >> for [5000] ms. Reason: [Association failed with > >> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: > >> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR > >> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - > >> Caught > >> exception akka.actor.ActorNotFound: Actor not found for: > >> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > >> Path(/user/jobmanager)] at > >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal > >> a: > >> 65) at > >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal > >> a: > >> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > >> > >> at > >> > >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto > >> r. > >> scala:55) at > >> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at > >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec > >> ut > >> e(Future.scala:74) at > >> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) > >> at > >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur > >> e. > >> scala:73) at > >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > >> at > >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24 > >> 8) > >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) > >> > >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) > >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) > >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) > >> at > >> > >> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA > >> ct > >> orRefProvider.scala:87) at > >> akka.remote.EndpointWriter.postStop(Endpoint.scala:583) > >> > >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437) > >> at > >> > >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$ > >> fi > >> nishTerminate(FaultHandling.scala:210) at > >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) > >> > >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >> at > >> > >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract > >> Di > >> spatcher.scala:397) at > >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav > >> a: > >> 1339) at > >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> at > >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j > >> av > >> a:107) 2017-06-20 16:45:34,029 ERROR > >> org.apache.flink.yarn.YarnFlinkResourceManager - Resource > >> manager could not register at JobManager > >> akka.pattern.AskTimeoutException: > >> Ask timed out on > >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/), > >> Path(/user/jobmanager)]] after [10000 ms] at > >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334 > >> ) > >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > >> > >> at > >> > >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future > >> .s > >> cala:599) at > >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10 > >> 9) > >> at > >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59 > >> 7) > >> at > >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s > >> ca > >> la:474) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler. > >> sc > >> ala:425) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4 > >> 29 > >> ) at > >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > >> at java.lang.Thread.run(Thread.java:745) > >> 2017-06-20 16:45:34,030 INFO > >> org.apache.flink.yarn.YarnFlinkResourceManager>> > >> - Trying to associate with JobManager leader > >> > >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager > >> > >> > >> Finally, Job manager starts and works properly, just wanted to understand > >> the cause of the error. > >> > >> > >> Mant thanks in advance! > >> > >> > >> Kind Regards, > >> Mike Pryakhin > >> > >>> On 20 Jun 2017, at 17:34, Nico Kruber <[hidden email]> wrote: > >>> > >>> Hi Mike, > >>> have you configured zookeeper [1] ? afaik, it is required for a high- > >>> availability (YARN) session and is used to store JobManager state. > >>> Without > >>> it, a recovery would not know what to recover from. > >>> > >>> > >>> Nico > >>> > >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > >>> jobmanager_high_availability.html#yarn-cluster-high-availability > >>> > >>> On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote: > >>>> Hello, > >>>> > >>>> I'm currently trying to check whether my job is restarted in case of > >>>> Job > >>>> Manager failure. The job is submitted as a single job on YARN with the > >>>> following options set in the flink-conf.yaml: > >>>> > >>>> restart-strategy: fixed-delay > >>>> restart-strategy.fixed-delay.attempts: 3 > >>>> restart-strategy.fixed-delay.delay: 10 s > >>>> > >>>> Then I kill the Job Manager container. After that YARN starts a new Job > >>>> Manager container but the job is not started. What am I doing wrong? Do > >>>> I > >>>> need something else to be configured to enable job restarts on JM > >>>> failure? > >>>> > >>>> I'm using flink 1.3 Hadoop 2.6 > >>>> > >>>> Thanks in advance. > >>>> > >>>> Kind Regards, > >>>> Mike Pryakhin signature.asc (201 bytes) Download Attachment |
Many thanks Nico!! everything is clear now Kind Regards, Mike Pryakhin
smime.p7s (2K) Download Attachment |
Free forum by Nabble | Edit this page |