Can't get my job restarted on job manager failures

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Can't get my job restarted on job manager failures

Mikhail Pryakhin-2
Hello,

 

I'm currently trying to check whether my job is restarted in case of Job Manager failure.
The job is submitted as a single job on YARN with the following options set in the flink-conf.yaml:

 

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

 

Then I kill the Job Manager container. After that YARN starts a new Job Manager container but the job is not started.
What am I doing wrong? Do I need something else to be configured to enable job restarts on JM failure?

 

I'm using flink 1.3 Hadoop 2.6

 

Thanks in advance.

Kind Regards,
Mike Pryakhin










smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can't get my job restarted on job manager failures

Nico Kruber
Hi Mike,
have you configured zookeeper [1] ? afaik, it is required for a high-
availability (YARN) session and is used to store JobManager state. Without it,
a recovery would not know what to recover from.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
jobmanager_high_availability.html#yarn-cluster-high-availability

On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote:

> Hello,
>
> I'm currently trying to check whether my job is restarted in case of Job
> Manager failure. The job is submitted as a single job on YARN with the
> following options set in the flink-conf.yaml:
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 3
> restart-strategy.fixed-delay.delay: 10 s
>
> Then I kill the Job Manager container. After that YARN starts a new Job
> Manager container but the job is not started. What am I doing wrong? Do I
> need something else to be configured to enable job restarts on JM failure?
>
> I'm using flink 1.3 Hadoop 2.6
>
> Thanks in advance.
>
> Kind Regards,
> Mike Pryakhin


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can't get my job restarted on job manager failures

Mikhail Pryakhin-2
Hi Niko,
Thanks for your reply!

Having zookeeper-related properties configured everything works smoothly!
I was confused because the doc references high availability configuration for yarn session mode, anyway thanks a lot!

Now I noticed another problem, when I kill the job manager, it then gets restarted by YARN, and the following stacktrace appears in the Job Manager log:

2017-06-20 16:44:53,843 WARN  org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]
	at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
	at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
	at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
	at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
	at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
	at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
	at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
	at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
	at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
	at akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
	at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:369)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-06-20 16:44:53,932 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Received new token for : dmpkit-dev-dn2:8041
2017-06-20 16:44:53,937 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Retrieved 1 TaskManagers from previous attempt
2017-06-20 16:44:53,948 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager
2017-06-20 16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager                - Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
	at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:03,971 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager
2017-06-20 16:45:03,975 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:24660]
2017-06-20 16:45:13,989 ERROR org.apache.flink.yarn.YarnFlinkResourceManager                - Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
	at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:13,990 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager
2017-06-20 16:45:13,993 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:24660]
2017-06-20 16:45:18,174 ERROR org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught exception
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]
	at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
	at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
	at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
	at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
	at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
	at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
	at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
	at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
	at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
	at akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
	at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:369)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-06-20 16:45:24,010 ERROR org.apache.flink.yarn.YarnFlinkResourceManager                - Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
	at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:24,010 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager
2017-06-20 16:45:24,014 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:24660]
2017-06-20 16:45:24,772 ERROR org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught exception
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]
	at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
	at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
	at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
	at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
	at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
	at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
	at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
	at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
	at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
	at akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
	at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:369)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-06-20 16:45:34,029 ERROR org.apache.flink.yarn.YarnFlinkResourceManager                - Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/), Path(/user/jobmanager)]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
	at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:34,030 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to associate with JobManager leader <a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager  


Finally, Job manager starts and works properly, just wanted to understand the cause of the error.


Mant thanks in advance!


Kind Regards,
Mike Pryakhin


On 20 Jun 2017, at 17:34, Nico Kruber <[hidden email]> wrote:

Hi Mike,
have you configured zookeeper [1] ? afaik, it is required for a high-
availability (YARN) session and is used to store JobManager state. Without it,
a recovery would not know what to recover from.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
jobmanager_high_availability.html#yarn-cluster-high-availability

On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote:
Hello,

I'm currently trying to check whether my job is restarted in case of Job
Manager failure. The job is submitted as a single job on YARN with the
following options set in the flink-conf.yaml:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

Then I kill the Job Manager container. After that YARN starts a new Job
Manager container but the job is not started. What am I doing wrong? Do I
need something else to be configured to enable job restarts on JM failure?

I'm using flink 1.3 Hadoop 2.6

Thanks in advance.

Kind Regards,
Mike Pryakhin



smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can't get my job restarted on job manager failures

Nico Kruber
My best guess here is that the resource manager is still trying to connect to
the JobManager which failed - after all, how should it know whether this is a
temporary network failure or a permanent failure?!

If, after your new JobManager starts, the errors stop as well, I'd say, you
don't have to worry about the messages.
Till (cc'd) may elaborate a bit more on this.


Nico

On Tuesday, 20 June 2017 17:06:00 CEST Mikhail Pryakhin wrote:

> Hi Niko,
> Thanks for your reply!
>
> Having zookeeper-related properties configured everything works smoothly!
> I was confused because the doc references high availability configuration
> for yarn session mode, anyway thanks a lot!
>
> Now I noticed another problem, when I kill the job manager, it then gets
> restarted by YARN, and the following stacktrace appears in the Job Manager
> log:
>
> 2017-06-20 16:44:53,843 WARN
> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
> retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found
> for: ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)] at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 65) at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
> scala:55) at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
> e(Future.scala:74) at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
> scala:73) at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
> orRefProvider.scala:87) at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
> nishTerminate(FaultHandling.scala:210) at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
> spatcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107) 2017-06-20 16:44:53,932 INFO
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Received
> new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Retrieved 1
> TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to
> associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager          
>      - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:03,971 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:03,975 WARN  akka.remote.ReliableDeliverySupervisor                  
>      - Association with remote system
> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR
> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> manager could not register at JobManager akka.pattern.AskTimeoutException:
> Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:13,990 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:13,993 WARN  akka.remote.ReliableDeliverySupervisor                  
>      - Association with remote system
> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught
> exception akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)] at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 65) at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
> scala:55) at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
> e(Future.scala:74) at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
> scala:73) at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
> orRefProvider.scala:87) at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
> nishTerminate(FaultHandling.scala:210) at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
> spatcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107) 2017-06-20 16:45:24,010 ERROR
> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> manager could not register at JobManager akka.pattern.AskTimeoutException:
> Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:24,010 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> 16:45:24,014 WARN  akka.remote.ReliableDeliverySupervisor                  
>      - Association with remote system
> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught
> exception akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)] at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 65) at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
> scala:55) at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
> e(Future.scala:74) at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
> scala:73) at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
> orRefProvider.scala:87) at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
> nishTerminate(FaultHandling.scala:210) at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
> spatcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107) 2017-06-20 16:45:34,029 ERROR
> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> manager could not register at JobManager akka.pattern.AskTimeoutException:
> Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> Path(/user/jobmanager)]] after [10000 ms] at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
> cala:599) at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
> la:474) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
> ala:425) at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
> ) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 2017-06-20 16:45:34,030 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
>                - Trying to associate with JobManager leader
> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager
>
>
> Finally, Job manager starts and works properly, just wanted to understand
> the cause of the error.
>
>
> Mant thanks in advance!
>
>
> Kind Regards,
> Mike Pryakhin
>
> > On 20 Jun 2017, at 17:34, Nico Kruber <[hidden email]> wrote:
> >
> > Hi Mike,
> > have you configured zookeeper [1] ? afaik, it is required for a high-
> > availability (YARN) session and is used to store JobManager state. Without
> > it, a recovery would not know what to recover from.
> >
> >
> > Nico
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > jobmanager_high_availability.html#yarn-cluster-high-availability
> >
> > On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote:
> >> Hello,
> >>
> >> I'm currently trying to check whether my job is restarted in case of Job
> >> Manager failure. The job is submitted as a single job on YARN with the
> >> following options set in the flink-conf.yaml:
> >>
> >> restart-strategy: fixed-delay
> >> restart-strategy.fixed-delay.attempts: 3
> >> restart-strategy.fixed-delay.delay: 10 s
> >>
> >> Then I kill the Job Manager container. After that YARN starts a new Job
> >> Manager container but the job is not started. What am I doing wrong? Do I
> >> need something else to be configured to enable job restarts on JM
> >> failure?
> >>
> >> I'm using flink 1.3 Hadoop 2.6
> >>
> >> Thanks in advance.
> >>
> >> Kind Regards,
> >> Mike Pryakhin


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can't get my job restarted on job manager failures

Mikhail Pryakhin-2
Thanks a lot Niko!

It is definitely a permanent failure, I’ve tried multiple times and always got the same behaviour.
And as you said, right after the job manager has started the errors stop as well.

I also checked the logs of the Task Manager and found huge amount of the following warnings ():

2017-06-20 18:38:45,087 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:30086]
2017-06-20 18:38:50,105 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:30086]
2017-06-20 18:38:55,125 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused: dmpkit-dev-dn2/136.243.170.205:30086]

the warnings above finally finish with the error message

2017-06-20 18:41:30,744 ERROR Remoting                                                      - Association to [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] with UID [77777149] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Delivery of system messages timed out and they were dropped.
	at akka.remote.ReliableDeliverySupervisor$$anonfun$gated$1.applyOrElse(Endpoint.scala:336)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:189)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

After that multiple Info lines follow.

2017-06-20 18:47:40,453 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator 2147483480 dead.





Kind Regards,
Mike Pryakhin

On 20 Jun 2017, at 18:27, Nico Kruber <[hidden email]> wrote:

My best guess here is that the resource manager is still trying to connect to
the JobManager which failed - after all, how should it know whether this is a
temporary network failure or a permanent failure?!

If, after your new JobManager starts, the errors stop as well, I'd say, you
don't have to worry about the messages.
Till (cc'd) may elaborate a bit more on this.


Nico

On Tuesday, 20 June 2017 17:06:00 CEST Mikhail Pryakhin wrote:
Hi Niko,
Thanks for your reply!

Having zookeeper-related properties configured everything works smoothly!
I was confused because the doc references high availability configuration
for yarn session mode, anyway thanks a lot!

Now I noticed another problem, when I kill the job manager, it then gets
restarted by YARN, and the following stacktrace appears in the Job Manager
log:

2017-06-20 16:44:53,843 WARN
org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found
for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)] at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
65) at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
scala:55) at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
e(Future.scala:74) at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
scala:73) at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
orRefProvider.scala:87) at
akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
nishTerminate(FaultHandling.scala:210) at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
spatcher.scala:397) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339) at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107) 2017-06-20 16:44:53,932 INFO
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Received
new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Retrieved 1
TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to
associate with JobManager leader
<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager          
    - Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:03,971 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Trying to associate with JobManager leader
<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:03,975 WARN  akka.remote.ReliableDeliverySupervisor                  
    - Association with remote system
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR
org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
manager could not register at JobManager akka.pattern.AskTimeoutException:
Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:13,990 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Trying to associate with JobManager leader
<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:13,993 WARN  akka.remote.ReliableDeliverySupervisor                  
    - Association with remote system
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR
org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught
exception akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)] at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
65) at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
scala:55) at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
e(Future.scala:74) at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
scala:73) at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
orRefProvider.scala:87) at
akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
nishTerminate(FaultHandling.scala:210) at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
spatcher.scala:397) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339) at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107) 2017-06-20 16:45:24,010 ERROR
org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
manager could not register at JobManager akka.pattern.AskTimeoutException:
Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:24,010 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Trying to associate with JobManager leader
<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:24,014 WARN  akka.remote.ReliableDeliverySupervisor                  
    - Association with remote system
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR
org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught
exception akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)] at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
65) at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:
63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.
scala:55) at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecut
e(Future.scala:74) at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.
scala:73) at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteAct
orRefProvider.scala:87) at
akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
nishTerminate(FaultHandling.scala:210) at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
spatcher.scala:397) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339) at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107) 2017-06-20 16:45:34,029 ERROR
org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
manager could not register at JobManager akka.pattern.AskTimeoutException:
Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.sca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429
) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:34,030 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
              - Trying to associate with JobManager leader
<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager


Finally, Job manager starts and works properly, just wanted to understand
the cause of the error.


Mant thanks in advance!


Kind Regards,
Mike Pryakhin

On 20 Jun 2017, at 17:34, Nico Kruber <[hidden email]> wrote:

Hi Mike,
have you configured zookeeper [1] ? afaik, it is required for a high-
availability (YARN) session and is used to store JobManager state. Without
it, a recovery would not know what to recover from.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
jobmanager_high_availability.html#yarn-cluster-high-availability

On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote:
Hello,

I'm currently trying to check whether my job is restarted in case of Job
Manager failure. The job is submitted as a single job on YARN with the
following options set in the flink-conf.yaml:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

Then I kill the Job Manager container. After that YARN starts a new Job
Manager container but the job is not started. What am I doing wrong? Do I
need something else to be configured to enable job restarts on JM
failure?

I'm using flink 1.3 Hadoop 2.6

Thanks in advance.

Kind Regards,
Mike Pryakhin



smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can't get my job restarted on job manager failures

Nico Kruber
yes, this supports my theory: both the ResourceManager and the TaskManager
instances are trying to reconnect to the old JobManager to re-establish a link
in case of temporary failures.
* In the best case, the connection is re-established and everything continues.
* Since your failure is permanent, this is not the case and after some
retries, Flink also thinks that ;)
We can't really avoid the warnings though, even in a HA setup, since the
administrator might want to know what is happening and what Flink is doing
under the hood.

Everything's fine then.

Nico

On Tuesday, 20 June 2017 17:56:00 CEST Mikhail Pryakhin wrote:

> Thanks a lot Niko!
>
> It is definitely a permanent failure, I’ve tried multiple times and always
> got the same behaviour. And as you said, right after the job manager has
> started the errors stop as well.
>
> I also checked the logs of the Task Manager and found huge amount of the
> following warnings ():
>
> 2017-06-20 18:38:45,087 WARN  akka.remote.ReliableDeliverySupervisor        
>                - Association with remote system
> [akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated
> for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:50,105 WARN
> akka.remote.ReliableDeliverySupervisor                        - Association
> with remote system [akka.tcp://flink@dmpkit-dev-dn2:30086] has failed,
> address is now gated for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:55,125 WARN
> akka.remote.ReliableDeliverySupervisor                        - Association
> with remote system [akka.tcp://flink@dmpkit-dev-dn2:30086] has failed,
> address is now gated for [5000] ms. Reason: [Association failed with
> [akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused:
> dmpkit-dev-dn2/136.243.170.205:30086]
>
> the warnings above finally finish with the error message
>
> 2017-06-20 18:41:30,744 ERROR Remoting                                      
>                - Association to [akka.tcp://flink@dmpkit-dev-dn2:30086]
> with UID [77777149] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Delivery of system messages timed
> out and they were dropped. at
> akka.remote.ReliableDeliverySupervisor$$anonfun$gated$1.applyOrElse(Endpoin
> t.scala:336) at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:
189)

> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
> spatcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107)
>
> After that multiple Info lines follow.
>
> 2017-06-20 18:47:40,453 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147483480 dead.
>
>
>
>
>
> Kind Regards,
> Mike Pryakhin
>
> > On 20 Jun 2017, at 18:27, Nico Kruber <[hidden email]> wrote:
> >
> > My best guess here is that the resource manager is still trying to connect
> > to the JobManager which failed - after all, how should it know whether
> > this is a temporary network failure or a permanent failure?!
> >
> > If, after your new JobManager starts, the errors stop as well, I'd say,
> > you
> > don't have to worry about the messages.
> > Till (cc'd) may elaborate a bit more on this.
> >
> >
> > Nico
> >
> > On Tuesday, 20 June 2017 17:06:00 CEST Mikhail Pryakhin wrote:
> >> Hi Niko,
> >> Thanks for your reply!
> >>
> >> Having zookeeper-related properties configured everything works smoothly!
> >> I was confused because the doc references high availability configuration
> >> for yarn session mode, anyway thanks a lot!
> >>
> >> Now I noticed another problem, when I kill the job manager, it then gets
> >> restarted by YARN, and the following stacktrace appears in the Job
> >> Manager
> >> log:
> >>
> >> 2017-06-20 16:44:53,843 WARN
> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
> >> retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not
> >> found
> >> for: ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> >> Path(/user/jobmanager)] at
> >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
> >> a:
> >> 65) at
> >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
> >> a:
> >> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> >>
> >> at
> >>
> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto
> >> r.
> >> scala:55) at
> >> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec
> >> ut
> >> e(Future.scala:74) at
> >> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
> >> at
> >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur
> >> e.
> >> scala:73) at
> >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> >> at
> >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24
> >> 8)
> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> >>
> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> >> at
> >>
> >> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA
> >> ct
> >> orRefProvider.scala:87) at
> >> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> >>
> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> >> at
> >>
> >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$
> >> fi
> >> nishTerminate(FaultHandling.scala:210) at
> >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> >>
> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
> >> Di
> >> spatcher.scala:397) at
> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
> >> a:
> >> 1339) at
> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
> >> av
> >> a:107) 2017-06-20 16:44:53,932 INFO
> >> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Received
> >> new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO
> >> org.apache.flink.yarn.YarnFlinkResourceManager                - Retrieved
> >> 1
> >> TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO
> >> org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to
> >> associate with JobManager leader
> >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> >> 16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager
> >>
> >>     - Resource manager could not register at JobManager
> >>
> >> akka.pattern.AskTimeoutException: Ask timed out on
> >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> >> Path(/user/jobmanager)]] after [10000 ms] at
> >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
> >> )
> >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> >>
> >> at
> >>
> >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
> >> .s
> >> cala:599) at
> >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
> >> 9)
> >> at
> >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
> >> 7)
> >> at
> >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
> >> ca
> >> la:474) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
> >> sc
> >> ala:425) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
> >> 29
> >> ) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2017-06-20 16:45:03,971 INFO
> >> org.apache.flink.yarn.YarnFlinkResourceManager>>
> >>               - Trying to associate with JobManager leader
> >>
> >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> >> 16:45:03,975 WARN  akka.remote.ReliableDeliverySupervisor
> >>
> >>     - Association with remote system
> >>
> >> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> >> for [5000] ms. Reason: [Association failed with
> >> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> >> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR
> >> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> >> manager could not register at JobManager
> >> akka.pattern.AskTimeoutException:
> >> Ask timed out on
> >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> >> Path(/user/jobmanager)]] after [10000 ms] at
> >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
> >> )
> >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> >>
> >> at
> >>
> >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
> >> .s
> >> cala:599) at
> >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
> >> 9)
> >> at
> >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
> >> 7)
> >> at
> >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
> >> ca
> >> la:474) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
> >> sc
> >> ala:425) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
> >> 29
> >> ) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2017-06-20 16:45:13,990 INFO
> >> org.apache.flink.yarn.YarnFlinkResourceManager>>
> >>               - Trying to associate with JobManager leader
> >>
> >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> >> 16:45:13,993 WARN  akka.remote.ReliableDeliverySupervisor
> >>
> >>     - Association with remote system
> >>
> >> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> >> for [5000] ms. Reason: [Association failed with
> >> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> >> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR
> >> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  -
> >> Caught
> >> exception akka.actor.ActorNotFound: Actor not found for:
> >> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> >> Path(/user/jobmanager)] at
> >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
> >> a:
> >> 65) at
> >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
> >> a:
> >> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> >>
> >> at
> >>
> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto
> >> r.
> >> scala:55) at
> >> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec
> >> ut
> >> e(Future.scala:74) at
> >> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
> >> at
> >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur
> >> e.
> >> scala:73) at
> >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> >> at
> >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24
> >> 8)
> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> >>
> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> >> at
> >>
> >> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA
> >> ct
> >> orRefProvider.scala:87) at
> >> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> >>
> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> >> at
> >>
> >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$
> >> fi
> >> nishTerminate(FaultHandling.scala:210) at
> >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> >>
> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
> >> Di
> >> spatcher.scala:397) at
> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
> >> a:
> >> 1339) at
> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
> >> av
> >> a:107) 2017-06-20 16:45:24,010 ERROR
> >> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> >> manager could not register at JobManager
> >> akka.pattern.AskTimeoutException:
> >> Ask timed out on
> >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> >> Path(/user/jobmanager)]] after [10000 ms] at
> >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
> >> )
> >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> >>
> >> at
> >>
> >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
> >> .s
> >> cala:599) at
> >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
> >> 9)
> >> at
> >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
> >> 7)
> >> at
> >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
> >> ca
> >> la:474) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
> >> sc
> >> ala:425) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
> >> 29
> >> ) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2017-06-20 16:45:24,010 INFO
> >> org.apache.flink.yarn.YarnFlinkResourceManager>>
> >>               - Trying to associate with JobManager leader
> >>
> >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
> >> 16:45:24,014 WARN  akka.remote.ReliableDeliverySupervisor
> >>
> >>     - Association with remote system
> >>
> >> [akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
> >> for [5000] ms. Reason: [Association failed with
> >> [akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
> >> dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR
> >> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  -
> >> Caught
> >> exception akka.actor.ActorNotFound: Actor not found for:
> >> ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> >> Path(/user/jobmanager)] at
> >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
> >> a:
> >> 65) at
> >> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
> >> a:
> >> 63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> >>
> >> at
> >>
> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto
> >> r.
> >> scala:55) at
> >> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec
> >> ut
> >> e(Future.scala:74) at
> >> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
> >> at
> >> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur
> >> e.
> >> scala:73) at
> >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> >> at
> >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24
> >> 8)
> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> >>
> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
> >> at
> >>
> >> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA
> >> ct
> >> orRefProvider.scala:87) at
> >> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
> >>
> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
> >> at
> >>
> >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$
> >> fi
> >> nishTerminate(FaultHandling.scala:210) at
> >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> >>
> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
> >> Di
> >> spatcher.scala:397) at
> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
> >> a:
> >> 1339) at
> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
> >> av
> >> a:107) 2017-06-20 16:45:34,029 ERROR
> >> org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
> >> manager could not register at JobManager
> >> akka.pattern.AskTimeoutException:
> >> Ask timed out on
> >> [ActorSelection[Anchor(akka.tcp://flink@dmpkit-dev-dn2:24660/),
> >> Path(/user/jobmanager)]] after [10000 ms] at
> >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
> >> )
> >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> >>
> >> at
> >>
> >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
> >> .s
> >> cala:599) at
> >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
> >> 9)
> >> at
> >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
> >> 7)
> >> at
> >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
> >> ca
> >> la:474) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
> >> sc
> >> ala:425) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
> >> 29
> >> ) at
> >> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2017-06-20 16:45:34,030 INFO
> >> org.apache.flink.yarn.YarnFlinkResourceManager>>
> >>               - Trying to associate with JobManager leader
> >>
> >> akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager
> >>
> >>
> >> Finally, Job manager starts and works properly, just wanted to understand
> >> the cause of the error.
> >>
> >>
> >> Mant thanks in advance!
> >>
> >>
> >> Kind Regards,
> >> Mike Pryakhin
> >>
> >>> On 20 Jun 2017, at 17:34, Nico Kruber <[hidden email]> wrote:
> >>>
> >>> Hi Mike,
> >>> have you configured zookeeper [1] ? afaik, it is required for a high-
> >>> availability (YARN) session and is used to store JobManager state.
> >>> Without
> >>> it, a recovery would not know what to recover from.
> >>>
> >>>
> >>> Nico
> >>>
> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> >>> jobmanager_high_availability.html#yarn-cluster-high-availability
> >>>
> >>> On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote:
> >>>> Hello,
> >>>>
> >>>> I'm currently trying to check whether my job is restarted in case of
> >>>> Job
> >>>> Manager failure. The job is submitted as a single job on YARN with the
> >>>> following options set in the flink-conf.yaml:
> >>>>
> >>>> restart-strategy: fixed-delay
> >>>> restart-strategy.fixed-delay.attempts: 3
> >>>> restart-strategy.fixed-delay.delay: 10 s
> >>>>
> >>>> Then I kill the Job Manager container. After that YARN starts a new Job
> >>>> Manager container but the job is not started. What am I doing wrong? Do
> >>>> I
> >>>> need something else to be configured to enable job restarts on JM
> >>>> failure?
> >>>>
> >>>> I'm using flink 1.3 Hadoop 2.6
> >>>>
> >>>> Thanks in advance.
> >>>>
> >>>> Kind Regards,
> >>>> Mike Pryakhin


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can't get my job restarted on job manager failures

Mikhail Pryakhin-2
Many thanks Nico!! 

everything is clear now

Kind Regards,
Mike Pryakhin

On 20 Jun 2017, at 19:24, Nico Kruber <[hidden email]> wrote:

yes, this supports my theory: both the ResourceManager and the TaskManager
instances are trying to reconnect to the old JobManager to re-establish a link
in case of temporary failures.
* In the best case, the connection is re-established and everything continues.
* Since your failure is permanent, this is not the case and after some
retries, Flink also thinks that ;)
We can't really avoid the warnings though, even in a HA setup, since the
administrator might want to know what is happening and what Flink is doing
under the hood.

Everything's fine then.

Nico

On Tuesday, 20 June 2017 17:56:00 CEST Mikhail Pryakhin wrote:
Thanks a lot Niko!

It is definitely a permanent failure, I’ve tried multiple times and always
got the same behaviour. And as you said, right after the job manager has
started the errors stop as well.

I also checked the logs of the Task Manager and found huge amount of the
following warnings ():

2017-06-20 18:38:45,087 WARN  akka.remote.ReliableDeliverySupervisor        
              - Association with remote system
[<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed, address is now gated
for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:50,105 WARN
akka.remote.ReliableDeliverySupervisor                        - Association
with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:30086] 2017-06-20 18:38:55,125 WARN
akka.remote.ReliableDeliverySupervisor                        - Association
with remote system [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086] has failed,
address is now gated for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:30086]

the warnings above finally finish with the error message

2017-06-20 18:41:30,744 ERROR Remoting                                      
              - Association to [<a href="akka.tcp://flink@dmpkit-dev-dn2:30086" class="">akka.tcp://flink@dmpkit-dev-dn2:30086]
with UID [77777149] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Delivery of system messages timed
out and they were dropped. at
akka.remote.ReliableDeliverySupervisor$$anonfun$gated$1.applyOrElse(Endpoin
t.scala:336) at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:
189)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi
spatcher.scala:397) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339) at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107)

After that multiple Info lines follow.

2017-06-20 18:47:40,453 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147483480 dead.





Kind Regards,
Mike Pryakhin

On 20 Jun 2017, at 18:27, Nico Kruber <[hidden email]> wrote:

My best guess here is that the resource manager is still trying to connect
to the JobManager which failed - after all, how should it know whether
this is a temporary network failure or a permanent failure?!

If, after your new JobManager starts, the errors stop as well, I'd say,
you
don't have to worry about the messages.
Till (cc'd) may elaborate a bit more on this.


Nico

On Tuesday, 20 June 2017 17:06:00 CEST Mikhail Pryakhin wrote:
Hi Niko,
Thanks for your reply!

Having zookeeper-related properties configured everything works smoothly!
I was confused because the doc references high availability configuration
for yarn session mode, anyway thanks a lot!

Now I noticed another problem, when I kill the job manager, it then gets
restarted by YARN, and the following stacktrace appears in the Job
Manager
log:

2017-06-20 16:44:53,843 WARN
org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not
found
for: ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)] at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
a:
65) at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
a:
63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

at

akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto
r.
scala:55) at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec
ut
e(Future.scala:74) at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur
e.
scala:73) at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24
8)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)

at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
at

akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA
ct
orRefProvider.scala:87) at
akka.remote.EndpointWriter.postStop(Endpoint.scala:583)

at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
at

akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$
fi
nishTerminate(FaultHandling.scala:210) at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at

akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
Di
spatcher.scala:397) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
a:
1339) at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
av
a:107) 2017-06-20 16:44:53,932 INFO
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Received
new token for : dmpkit-dev-dn2:8041 2017-06-20 16:44:53,937 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Retrieved
1
TaskManagers from previous attempt 2017-06-20 16:44:53,948 INFO
org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to
associate with JobManager leader
<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:03,970 ERROR org.apache.flink.yarn.YarnFlinkResourceManager

   - Resource manager could not register at JobManager

akka.pattern.AskTimeoutException: Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)

at

scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
9)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
7)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
ca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
29
) at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:03,971 INFO
org.apache.flink.yarn.YarnFlinkResourceManager>>
             - Trying to associate with JobManager leader

<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:03,975 WARN  akka.remote.ReliableDeliverySupervisor

   - Association with remote system

[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:13,989 ERROR
org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
manager could not register at JobManager
akka.pattern.AskTimeoutException:
Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)

at

scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
9)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
7)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
ca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
29
) at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:13,990 INFO
org.apache.flink.yarn.YarnFlinkResourceManager>>
             - Trying to associate with JobManager leader

<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:13,993 WARN  akka.remote.ReliableDeliverySupervisor

   - Association with remote system

[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:18,174 ERROR
org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  -
Caught
exception akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)] at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
a:
65) at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
a:
63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

at

akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto
r.
scala:55) at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec
ut
e(Future.scala:74) at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur
e.
scala:73) at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24
8)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)

at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
at

akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA
ct
orRefProvider.scala:87) at
akka.remote.EndpointWriter.postStop(Endpoint.scala:583)

at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
at

akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$
fi
nishTerminate(FaultHandling.scala:210) at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at

akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
Di
spatcher.scala:397) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
a:
1339) at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
av
a:107) 2017-06-20 16:45:24,010 ERROR
org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
manager could not register at JobManager
akka.pattern.AskTimeoutException:
Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)

at

scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
9)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
7)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
ca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
29
) at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:24,010 INFO
org.apache.flink.yarn.YarnFlinkResourceManager>>
             - Trying to associate with JobManager leader

<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager 2017-06-20
16:45:24,014 WARN  akka.remote.ReliableDeliverySupervisor

   - Association with remote system

[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660] has failed, address is now gated
for [5000] ms. Reason: [Association failed with
[<a href="akka.tcp://flink@dmpkit-dev-dn2:24660" class="">akka.tcp://flink@dmpkit-dev-dn2:24660]] Caused by: [Connection refused:
dmpkit-dev-dn2/136.243.170.205:24660] 2017-06-20 16:45:24,772 ERROR
org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  -
Caught
exception akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)] at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
a:
65) at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scal
a:
63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

at

akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecuto
r.
scala:55) at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExec
ut
e(Future.scala:74) at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Futur
e.
scala:73) at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:24
8)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)

at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
at

akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteA
ct
orRefProvider.scala:87) at
akka.remote.EndpointWriter.postStop(Endpoint.scala:583)

at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
at

akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$
fi
nishTerminate(FaultHandling.scala:210) at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at

akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
Di
spatcher.scala:397) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
a:
1339) at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
av
a:107) 2017-06-20 16:45:34,029 ERROR
org.apache.flink.yarn.YarnFlinkResourceManager                - Resource
manager could not register at JobManager
akka.pattern.AskTimeoutException:
Ask timed out on
[ActorSelection[Anchor(<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/),
Path(/user/jobmanager)]] after [10000 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334
)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)

at

scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future
.s
cala:599) at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:10
9)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
7)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.s
ca
la:474) at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.
sc
ala:425) at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:4
29
) at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
2017-06-20 16:45:34,030 INFO
org.apache.flink.yarn.YarnFlinkResourceManager>>
             - Trying to associate with JobManager leader

<a href="akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager" class="">akka.tcp://flink@dmpkit-dev-dn2:24660/user/jobmanager


Finally, Job manager starts and works properly, just wanted to understand
the cause of the error.


Mant thanks in advance!


Kind Regards,
Mike Pryakhin

On 20 Jun 2017, at 17:34, Nico Kruber <[hidden email]> wrote:

Hi Mike,
have you configured zookeeper [1] ? afaik, it is required for a high-
availability (YARN) session and is used to store JobManager state.
Without
it, a recovery would not know what to recover from.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
jobmanager_high_availability.html#yarn-cluster-high-availability

On Tuesday, 20 June 2017 13:23:35 CEST Mikhail Pryakhin wrote:
Hello,

I'm currently trying to check whether my job is restarted in case of
Job
Manager failure. The job is submitted as a single job on YARN with the
following options set in the flink-conf.yaml:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

Then I kill the Job Manager container. After that YARN starts a new Job
Manager container but the job is not started. What am I doing wrong? Do
I
need something else to be configured to enable job restarts on JM
failure?

I'm using flink 1.3 Hadoop 2.6

Thanks in advance.

Kind Regards,
Mike Pryakhin



smime.p7s (2K) Download Attachment