Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Anchit Jatana
This post was updated on .
Hi All,

I started my flink application on YARN using flink run -m yarn-cluster, after running smoothly for 20 hrs it failed. Ideally the application should have recovered on losing the Job Manger (which runs in the same container as the application master) pertaining to the fault tolerant nature of flink on YARN but it didn't recover and failed.

Please help me debug the logs.

Thank you

Regards,
Anchit

Below are the logs:

2016-11-01 14:12:37,592 INFO  org.apache.flink.runtime.client.JobClientActor                - 11/01/2016 14:12:36 Parse & Map Record - (Visitor ID, Product List)  -> Filtering None Objects -> Fetching Output(148/200) switched to RUNNING
2016-11-02 10:16:42,960 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing over to rm1
2016-11-02 10:17:24,026 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing over to rm2
2016-11-02 10:17:40,882 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing over to rm1
2016-11-02 10:24:41,964 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@10.66.245.26:47722] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-11-02 10:24:56,311 WARN  Remoting                                                      - Tried to associate with unreachable remote address [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.66.245.26:47722
2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor                - Lost connection to JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager. Triggering connection timeout.
2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager Actor[akka.tcp://flink@10.66.245.26:47722/user/jobmanager#1251121709].
2016-11-02 10:25:56,330 INFO  org.apache.flink.runtime.client.JobClientActor                - Terminate JobClientActor.
2016-11-02 10:25:56,331 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager null.
2016-11-02 10:25:56,333 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager.
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
        at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:204)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
        at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
        at com.tgt.prz.streaming.recs.drivers.SessionRecs2$.main(SessionRecs2.scala:126)
        at com.tgt.prz.streaming.recs.drivers.SessionRecs2.main(SessionRecs2.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997)
        at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994)
        at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
        at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Lost connection to the JobManager.
        at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:401)
        ... 24 more
Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager.
        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:252)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-11-02 10:25:56,341 INFO  org.apache.flink.yarn.YarnClusterClient                       - Sending shutdown request to the Application Master
2016-11-02 10:25:56,341 INFO  org.apache.flink.yarn.YarnClusterClient                       - Start application client.
2016-11-02 10:25:56,344 WARN  org.apache.flink.yarn.YarnClusterClient                       - YARN reported application state FAILED
2016-11-02 10:25:56,344 WARN  org.apache.flink.yarn.YarnClusterClient                       - Diagnostics: Application application_1476277440022_40328 failed 1 times due to Attempt recovered after RM restartAM Container for appattempt_1476277440022_40328_000001 exited with  exitCode: 243
For more detailed output, check application tracking page:http://mydns:8088/cluster/app/application_1476277440022_40328Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e3066_1476277440022_40328_01_000001
Exit code: 243
Stack trace: ExitCodeException exitCode=243:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
        at org.apache.hadoop.util.Shell.run(Shell.java:487)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
        at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Shell output: main : command provided 1
main : run as user is A12345
main : requested yarn user is A12345


Container exited with a non-zero exit code 243
Failing this attempt. Failing the application.
2016-11-02 10:25:56,346 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null.
2016-11-02 10:25:56,349 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:25:56,350 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null.
2016-11-02 10:25:56,351 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
2016-11-02 10:25:56,353 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
2016-11-02 10:25:56,363 WARN  Remoting                                                      - Tried to associate with unreachable remote address [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.66.245.26:47722
2016-11-02 10:25:56,870 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
2016-11-02 10:25:57,369 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:25:57,889 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
2016-11-02 10:25:58,389 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:25:59,410 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:25:59,909 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
2016-11-02 10:26:00,429 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:26:01,449 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:26:02,469 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:26:03,489 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:26:03,929 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
2016-11-02 10:26:03,935 WARN  Remoting                                                      - Tried to associate with unreachable remote address [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.66.245.26:47722
2016-11-02 10:26:04,509 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:26:05,529 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopCluster request to JobManager.
2016-11-02 10:26:06,345 WARN  org.apache.flink.yarn.YarnClusterClient                       - Error while stopping YARN cluster.
java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.ready(package.scala:86)
        at scala.concurrent.Await.ready(package.scala)
        at org.apache.flink.yarn.YarnClusterClient.shutdownCluster(YarnClusterClient.java:366)
        at org.apache.flink.yarn.YarnClusterClient.finalizeCluster(YarnClusterClient.java:336)
        at org.apache.flink.client.program.ClusterClient.shutdown(ClusterClient.java:206)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:260)
        at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997)
        at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994)
        at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
        at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
2016-11-02 10:26:06,347 INFO  org.apache.flink.yarn.YarnClusterClient                       - Deleting files in hdfs://littleredns/user/A12345/.flink/application_1476277440022_40328
2016-11-02 10:26:06,530 INFO  org.apache.flink.yarn.YarnClusterClient                       - Application application_1476277440022_40328 finished with state FAILED and final state FAILED at 1478100282775
2016-11-02 10:26:06,530 WARN  org.apache.flink.yarn.YarnClusterClient                       - Application failed. Diagnostics Application application_1476277440022_40328 failed 1 times due to Attempt recovered after RM restartAM Container for appattempt_1476277440022_40328_000001 exited with  exitCode: 243
For more detailed output, check application tracking page:http://mydns/cluster/app/application_1476277440022_40328Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e3066_1476277440022_40328_01_000001
Exit code: 243
Stack trace: ExitCodeException exitCode=243:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
        at org.apache.hadoop.util.Shell.run(Shell.java:487)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
        at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Shell output: main : command provided 1
main : run as user is A12345
main : requested yarn user is A12345


Container exited with a non-zero exit code 243
Failing this attempt. Failing the application.
2016-11-02 10:26:06,531 WARN  org.apache.flink.yarn.YarnClusterClient                       - If log aggregation is activated in the Hadoop cluster, we recommend to retrieve the full application log using this command:
        yarn logs -appReport application_1476277440022_40328
(It sometimes takes a few seconds until the logs are aggregated)
2016-11-02 10:26:06,531 INFO  org.apache.flink.yarn.YarnClusterClient                       - YARN Client is shutting down
2016-11-02 10:26:06,532 INFO  org.apache.flink.yarn.ApplicationClient                       - Stopped Application client.
2016-11-02 10:26:06,533 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
2016-11-02 10:26:06,536 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2016-11-02 10:26:06,537 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
2016-11-02 10:26:06,558 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Maximilian Michels
Hi Anchit,

It is possible that the application crashes for many different
reasons, e.g. error in user code, hardware/network failures. Have you
configured high availability for Yarn as described in the
documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/jobmanager_high_availability.html

-Max


On Wed, Nov 2, 2016 at 6:44 PM, Anchit Jatana
<[hidden email]> wrote:

> Hi All,
>
> I started my flink application on YARN using flink run -m yarn-cluster,
> after running smoothly for 20 hrs it failed. Ideally the application should
> have recover on losing the Job Manger (which runs in the same container as
> the application master) pertaining to the fault tolerant nature of flink on
> YARN but it didn't recover and failed.
>
> Please help me debug the logs.
>
> Thank you
>
> Regards,
> Anchit
>
> Below are the logs:
>
> 2016-11-01 14:12:37,592 INFO  org.apache.flink.runtime.client.JobClientActor
> - 11/01/2016 14:12:36   Parse & Map Record - (Visitor ID, Product List)  ->
> Filtering None Objects -> Fetching Output(148/200) switched to RUNNING
> 2016-11-02 10:16:42,960 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
> over to rm1
> 2016-11-02 10:17:24,026 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
> over to rm2
> 2016-11-02 10:17:40,882 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
> over to rm1
> 2016-11-02 10:24:41,964 WARN  akka.remote.ReliableDeliverySupervisor
> - Association with remote system [akka.tcp://flink@10.66.245.26:47722] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
> 2016-11-02 10:24:56,311 WARN  Remoting
> - Tried to associate with unreachable remote address
> [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all
> messages to this address will be delivered to dead letters. Reason:
> Connection refused: /10.66.245.26:47722
> 2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor
> - Lost connection to JobManager
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager. Triggering connection
> timeout.
> 2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor
> - Disconnect from JobManager
> Actor[akka.tcp://flink@10.66.245.26:47722/user/jobmanager#1251121709].
> 2016-11-02 10:25:56,330 INFO  org.apache.flink.runtime.client.JobClientActor
> - Terminate JobClientActor.
> 2016-11-02 10:25:56,331 INFO  org.apache.flink.runtime.client.JobClientActor
> - Disconnect from JobManager null.
> 2016-11-02 10:25:56,333 ERROR org.apache.flink.client.CliFrontend
> - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Communication with JobManager failed: Lost connection to
> the JobManager.
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
>         at
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:204)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>         at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
>         at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
>         at
> com.tgt.prz.streaming.recs.drivers.SessionRecs2$.main(SessionRecs2.scala:126)
>         at com.tgt.prz.streaming.recs.drivers.SessionRecs2.main(SessionRecs2.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
>         at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>         at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997)
>         at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994)
>         at
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>         at
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Communication with JobManager failed: Lost connection to the JobManager.
>         at
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:401)
>         ... 24 more
> Caused by:
> org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException:
> Lost connection to the JobManager.
>         at
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:252)
>         at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
>         at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2016-11-02 10:25:56,341 INFO  org.apache.flink.yarn.YarnClusterClient
> - Sending shutdown request to the Application Master
> 2016-11-02 10:25:56,341 INFO  org.apache.flink.yarn.YarnClusterClient
> - Start application client.
> 2016-11-02 10:25:56,344 WARN  org.apache.flink.yarn.YarnClusterClient
> - YARN reported application state FAILED
> 2016-11-02 10:25:56,344 WARN  org.apache.flink.yarn.YarnClusterClient
> - Diagnostics: Application application_1476277440022_40328 failed 1 times
> due to Attempt recovered after RM restartAM Container for
> appattempt_1476277440022_40328_000001 exited with  exitCode: 243
> For more detailed output, check application tracking
> page:http://d-3zkvk02.target.com:8088/cluster/app/application_1476277440022_40328Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e3066_1476277440022_40328_01_000001
> Exit code: 243
> Stack trace: ExitCodeException exitCode=243:
>         at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
>         at org.apache.hadoop.util.Shell.run(Shell.java:487)
>         at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
>         at
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371)
>         at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>         at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> Shell output: main : command provided 1
> main : run as user is A12345
> main : requested yarn user is A12345
>
>
> Container exited with a non-zero exit code 243
> Failing this attempt. Failing the application.
> 2016-11-02 10:25:56,346 INFO  org.apache.flink.yarn.ApplicationClient
> - Notification about new leader address
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null.
> 2016-11-02 10:25:56,349 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:25:56,350 INFO  org.apache.flink.yarn.ApplicationClient
> - Received address of new leader
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null.
> 2016-11-02 10:25:56,351 INFO  org.apache.flink.yarn.ApplicationClient
> - Disconnect from JobManager null.
> 2016-11-02 10:25:56,353 INFO  org.apache.flink.yarn.ApplicationClient
> - Trying to register at JobManager
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
> 2016-11-02 10:25:56,363 WARN  Remoting
> - Tried to associate with unreachable remote address
> [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all
> messages to this address will be delivered to dead letters. Reason:
> Connection refused: /10.66.245.26:47722
> 2016-11-02 10:25:56,870 INFO  org.apache.flink.yarn.ApplicationClient
> - Trying to register at JobManager
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
> 2016-11-02 10:25:57,369 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:25:57,889 INFO  org.apache.flink.yarn.ApplicationClient
> - Trying to register at JobManager
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
> 2016-11-02 10:25:58,389 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:25:59,410 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:25:59,909 INFO  org.apache.flink.yarn.ApplicationClient
> - Trying to register at JobManager
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
> 2016-11-02 10:26:00,429 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:26:01,449 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:26:02,469 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:26:03,489 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:26:03,929 INFO  org.apache.flink.yarn.ApplicationClient
> - Trying to register at JobManager
> akka.tcp://flink@10.66.245.26:47722/user/jobmanager.
> 2016-11-02 10:26:03,935 WARN  Remoting
> - Tried to associate with unreachable remote address
> [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all
> messages to this address will be delivered to dead letters. Reason:
> Connection refused: /10.66.245.26:47722
> 2016-11-02 10:26:04,509 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:26:05,529 INFO  org.apache.flink.yarn.ApplicationClient
> - Sending StopCluster request to JobManager.
> 2016-11-02 10:26:06,345 WARN  org.apache.flink.yarn.YarnClusterClient
> - Error while stopping YARN cluster.
> java.util.concurrent.TimeoutException: Futures timed out after [10000
> milliseconds]
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
>         at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
>         at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
>         at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.ready(package.scala:86)
>         at scala.concurrent.Await.ready(package.scala)
>         at
> org.apache.flink.yarn.YarnClusterClient.shutdownCluster(YarnClusterClient.java:366)
>         at
> org.apache.flink.yarn.YarnClusterClient.finalizeCluster(YarnClusterClient.java:336)
>         at
> org.apache.flink.client.program.ClusterClient.shutdown(ClusterClient.java:206)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:260)
>         at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997)
>         at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994)
>         at
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>         at
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> 2016-11-02 10:26:06,347 INFO  org.apache.flink.yarn.YarnClusterClient
> - Deleting files in
> hdfs://littleredns/user/A12345/.flink/application_1476277440022_40328
> 2016-11-02 10:26:06,530 INFO  org.apache.flink.yarn.YarnClusterClient
> - Application application_1476277440022_40328 finished with state FAILED and
> final state FAILED at 1478100282775
> 2016-11-02 10:26:06,530 WARN  org.apache.flink.yarn.YarnClusterClient
> - Application failed. Diagnostics Application
> application_1476277440022_40328 failed 1 times due to Attempt recovered
> after RM restartAM Container for appattempt_1476277440022_40328_000001
> exited with  exitCode: 243
> For more detailed output, check application tracking
> page:http://d-3zkvk02.target.com:8088/cluster/app/application_1476277440022_40328Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e3066_1476277440022_40328_01_000001
> Exit code: 243
> Stack trace: ExitCodeException exitCode=243:
>         at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
>         at org.apache.hadoop.util.Shell.run(Shell.java:487)
>         at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
>         at
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371)
>         at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>         at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> Shell output: main : command provided 1
> main : run as user is A12345
> main : requested yarn user is A12345
>
>
> Container exited with a non-zero exit code 243
> Failing this attempt. Failing the application.
> 2016-11-02 10:26:06,531 WARN  org.apache.flink.yarn.YarnClusterClient
> - If log aggregation is activated in the Hadoop cluster, we recommend to
> retrieve the full application log using this command:
>         yarn logs -appReport application_1476277440022_40328
> (It sometimes takes a few seconds until the logs are aggregated)
> 2016-11-02 10:26:06,531 INFO  org.apache.flink.yarn.YarnClusterClient
> - YARN Client is shutting down
> 2016-11-02 10:26:06,532 INFO  org.apache.flink.yarn.ApplicationClient
> - Stopped Application client.
> 2016-11-02 10:26:06,533 INFO  org.apache.flink.yarn.ApplicationClient
> - Disconnect from JobManager null.
> 2016-11-02 10:26:06,536 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting
> down remote daemon.
> 2016-11-02 10:26:06,537 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote
> daemon shut down; proceeding with flushing remote transports.
> 2016-11-02 10:26:06,558 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
> shut down.
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Anchit Jatana
Hi Maximilian,

Thanks for you response. Since, I'm running the application on YARN cluster using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' command. Is there anything more that I need to configure apart from setting up 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.

Just wished to confirm if there is anything more that I need to configure to set up HA on 'yarn-cluster' mode.

Thank you

Regards,
Anchit
Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Maximilian Michels
Hi Anchit,

The documentation mentions that you need Zookeeper in addition to
setting the application attempts. Zookeeper is needed to retrieve the
current leader for the client and to filter out old leaders in case
multiple exist (old processes could even stay alive in Yarn). Moreover, it
is needed to persist the state of the application.


-Max


On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
<[hidden email]> wrote:

> Hi Maximilian,
>
> Thanks for you response. Since, I'm running the application on YARN cluster
> using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' command.
> Is there anything more that I need to configure apart from setting up
> 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
>
> Just wished to confirm if there is anything more that I need to configure to
> set up HA on 'yarn-cluster' mode.
>
> Thank you
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Josh
I have a follow up question to this - if I'm running a job in 'yarn-cluster' mode with HA and then at some point the YARN application fails due to some hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED" state), how can I restore the job from the most recent checkpoint?

I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to restore from a savepoint, but what if I haven't manually taken a savepoint recently?

Thanks,
Josh

On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]> wrote:
Hi Anchit,

The documentation mentions that you need Zookeeper in addition to
setting the application attempts. Zookeeper is needed to retrieve the
current leader for the client and to filter out old leaders in case
multiple exist (old processes could even stay alive in Yarn). Moreover, it
is needed to persist the state of the application.


-Max


On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
<[hidden email]> wrote:
> Hi Maximilian,
>
> Thanks for you response. Since, I'm running the application on YARN cluster
> using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' command.
> Is there anything more that I need to configure apart from setting up
> 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
>
> Just wished to confirm if there is anything more that I need to configure to
> set up HA on 'yarn-cluster' mode.
>
> Thank you
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Ufuk Celebi
No you don't need to manually trigger a savepoint. With HA checkpoints
are persisted externally and store a pointer in ZooKeeper to recover
them after a JobManager failure.

On Fri, Nov 4, 2016 at 2:27 PM, Josh <[hidden email]> wrote:

> I have a follow up question to this - if I'm running a job in 'yarn-cluster'
> mode with HA and then at some point the YARN application fails due to some
> hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED"
> state), how can I restore the job from the most recent checkpoint?
>
> I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to
> restore from a savepoint, but what if I haven't manually taken a savepoint
> recently?
>
> Thanks,
> Josh
>
> On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Anchit,
>>
>> The documentation mentions that you need Zookeeper in addition to
>> setting the application attempts. Zookeeper is needed to retrieve the
>> current leader for the client and to filter out old leaders in case
>> multiple exist (old processes could even stay alive in Yarn). Moreover, it
>> is needed to persist the state of the application.
>>
>>
>> -Max
>>
>>
>> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
>> <[hidden email]> wrote:
>> > Hi Maximilian,
>> >
>> > Thanks for you response. Since, I'm running the application on YARN
>> > cluster
>> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..'
>> > command.
>> > Is there anything more that I need to configure apart from setting up
>> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
>> >
>> > Just wished to confirm if there is anything more that I need to
>> > configure to
>> > set up HA on 'yarn-cluster' mode.
>> >
>> > Thank you
>> >
>> > Regards,
>> > Anchit
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
>> > Sent from the Apache Flink User Mailing List archive. mailing list
>> > archive at Nabble.com.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Josh
Hi Ufuk,

I see, but in my case the failure caused YARN application moved into a finished/failed state - so the application itself is no longer running. How can I restart the application (or start a new YARN application) and ensure that it uses the checkpoint pointer stored in Zookeeper?

Thanks,
Josh

On Fri, Nov 4, 2016 at 1:52 PM, Ufuk Celebi <[hidden email]> wrote:
No you don't need to manually trigger a savepoint. With HA checkpoints
are persisted externally and store a pointer in ZooKeeper to recover
them after a JobManager failure.

On Fri, Nov 4, 2016 at 2:27 PM, Josh <[hidden email]> wrote:
> I have a follow up question to this - if I'm running a job in 'yarn-cluster'
> mode with HA and then at some point the YARN application fails due to some
> hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED"
> state), how can I restore the job from the most recent checkpoint?
>
> I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to
> restore from a savepoint, but what if I haven't manually taken a savepoint
> recently?
>
> Thanks,
> Josh
>
> On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Anchit,
>>
>> The documentation mentions that you need Zookeeper in addition to
>> setting the application attempts. Zookeeper is needed to retrieve the
>> current leader for the client and to filter out old leaders in case
>> multiple exist (old processes could even stay alive in Yarn). Moreover, it
>> is needed to persist the state of the application.
>>
>>
>> -Max
>>
>>
>> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
>> <[hidden email]> wrote:
>> > Hi Maximilian,
>> >
>> > Thanks for you response. Since, I'm running the application on YARN
>> > cluster
>> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..'
>> > command.
>> > Is there anything more that I need to configure apart from setting up
>> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
>> >
>> > Just wished to confirm if there is anything more that I need to
>> > configure to
>> > set up HA on 'yarn-cluster' mode.
>> >
>> > Thank you
>> >
>> > Regards,
>> > Anchit
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
>> > Sent from the Apache Flink User Mailing List archive. mailing list
>> > archive at Nabble.com.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Ufuk Celebi
If the configured ZooKeeper paths are still the same, the job should
be recovered automatically. On each submission a unique ZK namespace
is used based on the app ID.

So you have in ZK:
/flink/app_id/...

You would have to set that manually to resume an old application. You
can do this via -z flag
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cli.html).

Does this work?

On Fri, Nov 4, 2016 at 3:28 PM, Josh <[hidden email]> wrote:

> Hi Ufuk,
>
> I see, but in my case the failure caused YARN application moved into a
> finished/failed state - so the application itself is no longer running. How
> can I restart the application (or start a new YARN application) and ensure
> that it uses the checkpoint pointer stored in Zookeeper?
>
> Thanks,
> Josh
>
> On Fri, Nov 4, 2016 at 1:52 PM, Ufuk Celebi <[hidden email]> wrote:
>>
>> No you don't need to manually trigger a savepoint. With HA checkpoints
>> are persisted externally and store a pointer in ZooKeeper to recover
>> them after a JobManager failure.
>>
>> On Fri, Nov 4, 2016 at 2:27 PM, Josh <[hidden email]> wrote:
>> > I have a follow up question to this - if I'm running a job in
>> > 'yarn-cluster'
>> > mode with HA and then at some point the YARN application fails due to
>> > some
>> > hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED"
>> > state), how can I restore the job from the most recent checkpoint?
>> >
>> > I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to
>> > restore from a savepoint, but what if I haven't manually taken a
>> > savepoint
>> > recently?
>> >
>> > Thanks,
>> > Josh
>> >
>> > On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]>
>> > wrote:
>> >>
>> >> Hi Anchit,
>> >>
>> >> The documentation mentions that you need Zookeeper in addition to
>> >> setting the application attempts. Zookeeper is needed to retrieve the
>> >> current leader for the client and to filter out old leaders in case
>> >> multiple exist (old processes could even stay alive in Yarn). Moreover,
>> >> it
>> >> is needed to persist the state of the application.
>> >>
>> >>
>> >> -Max
>> >>
>> >>
>> >> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
>> >> <[hidden email]> wrote:
>> >> > Hi Maximilian,
>> >> >
>> >> > Thanks for you response. Since, I'm running the application on YARN
>> >> > cluster
>> >> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..'
>> >> > command.
>> >> > Is there anything more that I need to configure apart from setting up
>> >> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
>> >> >
>> >> > Just wished to confirm if there is anything more that I need to
>> >> > configure to
>> >> > set up HA on 'yarn-cluster' mode.
>> >> >
>> >> > Thank you
>> >> >
>> >> > Regards,
>> >> > Anchit
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> >
>> >> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
>> >> > Sent from the Apache Flink User Mailing List archive. mailing list
>> >> > archive at Nabble.com.
>> >
>> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Josh
Thanks, I didn't know about the -z flag! 

I haven't been able to get it to work though (using yarn-cluster, with a zookeeper root configured to /flink in my flink-conf.yaml)

I can see my job directory in ZK under /flink/application_1477475694024_0015 and I've tried a few ways to restore the job:

./bin/flink run -m yarn-cluster -yz /application_1477475694024_0015 ....
./bin/flink run -m yarn-cluster -yz application_1477475694024_0015 ....
./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015/ ....
./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015 ....

The job starts from scratch each time, without restored state.
 
Am I doing something wrong? I've also tried with -z instead of -yz but I'm using yarn-cluster to run a single job, so I think it should be -yz.



On Fri, Nov 4, 2016 at 2:33 PM, Ufuk Celebi <[hidden email]> wrote:
If the configured ZooKeeper paths are still the same, the job should
be recovered automatically. On each submission a unique ZK namespace
is used based on the app ID.

So you have in ZK:
/flink/app_id/...

You would have to set that manually to resume an old application. You
can do this via -z flag
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cli.html).

Does this work?

On Fri, Nov 4, 2016 at 3:28 PM, Josh <[hidden email]> wrote:
> Hi Ufuk,
>
> I see, but in my case the failure caused YARN application moved into a
> finished/failed state - so the application itself is no longer running. How
> can I restart the application (or start a new YARN application) and ensure
> that it uses the checkpoint pointer stored in Zookeeper?
>
> Thanks,
> Josh
>
> On Fri, Nov 4, 2016 at 1:52 PM, Ufuk Celebi <[hidden email]> wrote:
>>
>> No you don't need to manually trigger a savepoint. With HA checkpoints
>> are persisted externally and store a pointer in ZooKeeper to recover
>> them after a JobManager failure.
>>
>> On Fri, Nov 4, 2016 at 2:27 PM, Josh <[hidden email]> wrote:
>> > I have a follow up question to this - if I'm running a job in
>> > 'yarn-cluster'
>> > mode with HA and then at some point the YARN application fails due to
>> > some
>> > hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED"
>> > state), how can I restore the job from the most recent checkpoint?
>> >
>> > I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to
>> > restore from a savepoint, but what if I haven't manually taken a
>> > savepoint
>> > recently?
>> >
>> > Thanks,
>> > Josh
>> >
>> > On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]>
>> > wrote:
>> >>
>> >> Hi Anchit,
>> >>
>> >> The documentation mentions that you need Zookeeper in addition to
>> >> setting the application attempts. Zookeeper is needed to retrieve the
>> >> current leader for the client and to filter out old leaders in case
>> >> multiple exist (old processes could even stay alive in Yarn). Moreover,
>> >> it
>> >> is needed to persist the state of the application.
>> >>
>> >>
>> >> -Max
>> >>
>> >>
>> >> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
>> >> <[hidden email]> wrote:
>> >> > Hi Maximilian,
>> >> >
>> >> > Thanks for you response. Since, I'm running the application on YARN
>> >> > cluster
>> >> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..'
>> >> > command.
>> >> > Is there anything more that I need to configure apart from setting up
>> >> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
>> >> >
>> >> > Just wished to confirm if there is anything more that I need to
>> >> > configure to
>> >> > set up HA on 'yarn-cluster' mode.
>> >> >
>> >> > Thank you
>> >> >
>> >> > Regards,
>> >> > Anchit
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> >
>> >> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
>> >> > Sent from the Apache Flink User Mailing List archive. mailing list
>> >> > archive at Nabble.com.
>> >
>> >
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

Ufuk Celebi



On 4 November 2016 at 17:09:25, Josh ([hidden email]) wrote:

> Thanks, I didn't know about the -z flag!
>  
> I haven't been able to get it to work though (using yarn-cluster, with a
> zookeeper root configured to /flink in my flink-conf.yaml)
>  
> I can see my job directory in ZK under
> /flink/application_1477475694024_0015 and I've tried a few ways to restore
> the job:
>  
> ./bin/flink run -m yarn-cluster -yz /application_1477475694024_0015 ....
> ./bin/flink run -m yarn-cluster -yz application_1477475694024_0015 ....
> ./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015/
> ....
> ./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015
> ....
>  
> The job starts from scratch each time, without restored state.
>  
> Am I doing something wrong? I've also tried with -z instead of -yz but I'm
> using yarn-cluster to run a single job, so I think it should be -yz.

Can you please check the JobManager logs of the initial job that you want to resume and look for a line like this:


Using ‘.../flink/application_...' as Zookeeper namespace.

Now you need to set the part after 'flink/' as the namespace, probably "application_1477475694024_0015" (from your last message).

The flag should be just -z. You can also set it in the Flink config file:

high-availability.cluster-id: application_1477475694024_0015

Does this help?

---

There is also a new feature in Flink 1.2 allowing you to persist every checkpoint externally. The feature is already in, but the configuration will be adjusted (https://github.com/apache/flink/pull/2752).

Currently you can configure it by specifying a checkpoint directory manually via:

state.checkpoints.dir: hdfs:///flink/checkpoints

In the CheckpointConfig you enable it via

CheckpoingConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


– Ufuk