This post was updated on .
Hi All,
I started my flink application on YARN using flink run -m yarn-cluster, after running smoothly for 20 hrs it failed. Ideally the application should have recovered on losing the Job Manger (which runs in the same container as the application master) pertaining to the fault tolerant nature of flink on YARN but it didn't recover and failed. Please help me debug the logs. Thank you Regards, Anchit Below are the logs: 2016-11-01 14:12:37,592 INFO org.apache.flink.runtime.client.JobClientActor - 11/01/2016 14:12:36 Parse & Map Record - (Visitor ID, Product List) -> Filtering None Objects -> Fetching Output(148/200) switched to RUNNING 2016-11-02 10:16:42,960 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm1 2016-11-02 10:17:24,026 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm2 2016-11-02 10:17:40,882 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm1 2016-11-02 10:24:41,964 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@10.66.245.26:47722] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2016-11-02 10:24:56,311 WARN Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.66.245.26:47722 2016-11-02 10:24:56,315 INFO org.apache.flink.runtime.client.JobClientActor - Lost connection to JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager. Triggering connection timeout. 2016-11-02 10:24:56,315 INFO org.apache.flink.runtime.client.JobClientActor - Disconnect from JobManager Actor[akka.tcp://flink@10.66.245.26:47722/user/jobmanager#1251121709]. 2016-11-02 10:25:56,330 INFO org.apache.flink.runtime.client.JobClientActor - Terminate JobClientActor. 2016-11-02 10:25:56,331 INFO org.apache.flink.runtime.client.JobClientActor - Disconnect from JobManager null. 2016-11-02 10:25:56,333 ERROR org.apache.flink.client.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405) at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:204) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585) at com.tgt.prz.streaming.recs.drivers.SessionRecs2$.main(SessionRecs2.scala:126) at com.tgt.prz.streaming.recs.drivers.SessionRecs2.main(SessionRecs2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997) at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994) at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:401) ... 24 more Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:252) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-11-02 10:25:56,341 INFO org.apache.flink.yarn.YarnClusterClient - Sending shutdown request to the Application Master 2016-11-02 10:25:56,341 INFO org.apache.flink.yarn.YarnClusterClient - Start application client. 2016-11-02 10:25:56,344 WARN org.apache.flink.yarn.YarnClusterClient - YARN reported application state FAILED 2016-11-02 10:25:56,344 WARN org.apache.flink.yarn.YarnClusterClient - Diagnostics: Application application_1476277440022_40328 failed 1 times due to Attempt recovered after RM restartAM Container for appattempt_1476277440022_40328_000001 exited with exitCode: 243 For more detailed output, check application tracking page:http://mydns:8088/cluster/app/application_1476277440022_40328Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_e3066_1476277440022_40328_01_000001 Exit code: 243 Stack trace: ExitCodeException exitCode=243: at org.apache.hadoop.util.Shell.runCommand(Shell.java:576) at org.apache.hadoop.util.Shell.run(Shell.java:487) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753) at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Shell output: main : command provided 1 main : run as user is A12345 main : requested yarn user is A12345 Container exited with a non-zero exit code 243 Failing this attempt. Failing the application. 2016-11-02 10:25:56,346 INFO org.apache.flink.yarn.ApplicationClient - Notification about new leader address akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null. 2016-11-02 10:25:56,349 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:25:56,350 INFO org.apache.flink.yarn.ApplicationClient - Received address of new leader akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null. 2016-11-02 10:25:56,351 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null. 2016-11-02 10:25:56,353 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager. 2016-11-02 10:25:56,363 WARN Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.66.245.26:47722 2016-11-02 10:25:56,870 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager. 2016-11-02 10:25:57,369 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:25:57,889 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager. 2016-11-02 10:25:58,389 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:25:59,410 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:25:59,909 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager. 2016-11-02 10:26:00,429 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:26:01,449 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:26:02,469 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:26:03,489 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:26:03,929 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.66.245.26:47722/user/jobmanager. 2016-11-02 10:26:03,935 WARN Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.66.245.26:47722 2016-11-02 10:26:04,509 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:26:05,529 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-11-02 10:26:06,345 WARN org.apache.flink.yarn.YarnClusterClient - Error while stopping YARN cluster. java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.ready(package.scala:86) at scala.concurrent.Await.ready(package.scala) at org.apache.flink.yarn.YarnClusterClient.shutdownCluster(YarnClusterClient.java:366) at org.apache.flink.yarn.YarnClusterClient.finalizeCluster(YarnClusterClient.java:336) at org.apache.flink.client.program.ClusterClient.shutdown(ClusterClient.java:206) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:260) at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997) at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994) at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) 2016-11-02 10:26:06,347 INFO org.apache.flink.yarn.YarnClusterClient - Deleting files in hdfs://littleredns/user/A12345/.flink/application_1476277440022_40328 2016-11-02 10:26:06,530 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1476277440022_40328 finished with state FAILED and final state FAILED at 1478100282775 2016-11-02 10:26:06,530 WARN org.apache.flink.yarn.YarnClusterClient - Application failed. Diagnostics Application application_1476277440022_40328 failed 1 times due to Attempt recovered after RM restartAM Container for appattempt_1476277440022_40328_000001 exited with exitCode: 243 For more detailed output, check application tracking page:http://mydns/cluster/app/application_1476277440022_40328Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_e3066_1476277440022_40328_01_000001 Exit code: 243 Stack trace: ExitCodeException exitCode=243: at org.apache.hadoop.util.Shell.runCommand(Shell.java:576) at org.apache.hadoop.util.Shell.run(Shell.java:487) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753) at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Shell output: main : command provided 1 main : run as user is A12345 main : requested yarn user is A12345 Container exited with a non-zero exit code 243 Failing this attempt. Failing the application. 2016-11-02 10:26:06,531 WARN org.apache.flink.yarn.YarnClusterClient - If log aggregation is activated in the Hadoop cluster, we recommend to retrieve the full application log using this command: yarn logs -appReport application_1476277440022_40328 (It sometimes takes a few seconds until the logs are aggregated) 2016-11-02 10:26:06,531 INFO org.apache.flink.yarn.YarnClusterClient - YARN Client is shutting down 2016-11-02 10:26:06,532 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-11-02 10:26:06,533 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null. 2016-11-02 10:26:06,536 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2016-11-02 10:26:06,537 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2016-11-02 10:26:06,558 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down. |
Hi Anchit,
It is possible that the application crashes for many different reasons, e.g. error in user code, hardware/network failures. Have you configured high availability for Yarn as described in the documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/jobmanager_high_availability.html -Max On Wed, Nov 2, 2016 at 6:44 PM, Anchit Jatana <[hidden email]> wrote: > Hi All, > > I started my flink application on YARN using flink run -m yarn-cluster, > after running smoothly for 20 hrs it failed. Ideally the application should > have recover on losing the Job Manger (which runs in the same container as > the application master) pertaining to the fault tolerant nature of flink on > YARN but it didn't recover and failed. > > Please help me debug the logs. > > Thank you > > Regards, > Anchit > > Below are the logs: > > 2016-11-01 14:12:37,592 INFO org.apache.flink.runtime.client.JobClientActor > - 11/01/2016 14:12:36 Parse & Map Record - (Visitor ID, Product List) -> > Filtering None Objects -> Fetching Output(148/200) switched to RUNNING > 2016-11-02 10:16:42,960 INFO > org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing > over to rm1 > 2016-11-02 10:17:24,026 INFO > org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing > over to rm2 > 2016-11-02 10:17:40,882 INFO > org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing > over to rm1 > 2016-11-02 10:24:41,964 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system [akka.tcp://flink@10.66.245.26:47722] has > failed, address is now gated for [5000] ms. Reason is: [Disassociated]. > 2016-11-02 10:24:56,311 WARN Remoting > - Tried to associate with unreachable remote address > [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all > messages to this address will be delivered to dead letters. Reason: > Connection refused: /10.66.245.26:47722 > 2016-11-02 10:24:56,315 INFO org.apache.flink.runtime.client.JobClientActor > - Lost connection to JobManager > akka.tcp://flink@10.66.245.26:47722/user/jobmanager. Triggering connection > timeout. > 2016-11-02 10:24:56,315 INFO org.apache.flink.runtime.client.JobClientActor > - Disconnect from JobManager > Actor[akka.tcp://flink@10.66.245.26:47722/user/jobmanager#1251121709]. > 2016-11-02 10:25:56,330 INFO org.apache.flink.runtime.client.JobClientActor > - Terminate JobClientActor. > 2016-11-02 10:25:56,331 INFO org.apache.flink.runtime.client.JobClientActor > - Disconnect from JobManager null. > 2016-11-02 10:25:56,333 ERROR org.apache.flink.client.CliFrontend > - Error while running the command. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:204) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585) > at > com.tgt.prz.streaming.recs.drivers.SessionRecs2$.main(SessionRecs2.scala:126) > at com.tgt.prz.streaming.recs.drivers.SessionRecs2.main(SessionRecs2.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994) > at > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Communication with JobManager failed: Lost connection to the JobManager. > at > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:401) > ... 24 more > Caused by: > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: > Lost connection to the JobManager. > at > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:252) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2016-11-02 10:25:56,341 INFO org.apache.flink.yarn.YarnClusterClient > - Sending shutdown request to the Application Master > 2016-11-02 10:25:56,341 INFO org.apache.flink.yarn.YarnClusterClient > - Start application client. > 2016-11-02 10:25:56,344 WARN org.apache.flink.yarn.YarnClusterClient > - YARN reported application state FAILED > 2016-11-02 10:25:56,344 WARN org.apache.flink.yarn.YarnClusterClient > - Diagnostics: Application application_1476277440022_40328 failed 1 times > due to Attempt recovered after RM restartAM Container for > appattempt_1476277440022_40328_000001 exited with exitCode: 243 > For more detailed output, check application tracking > page:http://d-3zkvk02.target.com:8088/cluster/app/application_1476277440022_40328Then, > click on links to logs of each attempt. > Diagnostics: Exception from container-launch. > Container id: container_e3066_1476277440022_40328_01_000001 > Exit code: 243 > Stack trace: ExitCodeException exitCode=243: > at org.apache.hadoop.util.Shell.runCommand(Shell.java:576) > at org.apache.hadoop.util.Shell.run(Shell.java:487) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753) > at > org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Shell output: main : command provided 1 > main : run as user is A12345 > main : requested yarn user is A12345 > > > Container exited with a non-zero exit code 243 > Failing this attempt. Failing the application. > 2016-11-02 10:25:56,346 INFO org.apache.flink.yarn.ApplicationClient > - Notification about new leader address > akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null. > 2016-11-02 10:25:56,349 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:25:56,350 INFO org.apache.flink.yarn.ApplicationClient > - Received address of new leader > akka.tcp://flink@10.66.245.26:47722/user/jobmanager with session ID null. > 2016-11-02 10:25:56,351 INFO org.apache.flink.yarn.ApplicationClient > - Disconnect from JobManager null. > 2016-11-02 10:25:56,353 INFO org.apache.flink.yarn.ApplicationClient > - Trying to register at JobManager > akka.tcp://flink@10.66.245.26:47722/user/jobmanager. > 2016-11-02 10:25:56,363 WARN Remoting > - Tried to associate with unreachable remote address > [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all > messages to this address will be delivered to dead letters. Reason: > Connection refused: /10.66.245.26:47722 > 2016-11-02 10:25:56,870 INFO org.apache.flink.yarn.ApplicationClient > - Trying to register at JobManager > akka.tcp://flink@10.66.245.26:47722/user/jobmanager. > 2016-11-02 10:25:57,369 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:25:57,889 INFO org.apache.flink.yarn.ApplicationClient > - Trying to register at JobManager > akka.tcp://flink@10.66.245.26:47722/user/jobmanager. > 2016-11-02 10:25:58,389 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:25:59,410 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:25:59,909 INFO org.apache.flink.yarn.ApplicationClient > - Trying to register at JobManager > akka.tcp://flink@10.66.245.26:47722/user/jobmanager. > 2016-11-02 10:26:00,429 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:26:01,449 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:26:02,469 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:26:03,489 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:26:03,929 INFO org.apache.flink.yarn.ApplicationClient > - Trying to register at JobManager > akka.tcp://flink@10.66.245.26:47722/user/jobmanager. > 2016-11-02 10:26:03,935 WARN Remoting > - Tried to associate with unreachable remote address > [akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all > messages to this address will be delivered to dead letters. Reason: > Connection refused: /10.66.245.26:47722 > 2016-11-02 10:26:04,509 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:26:05,529 INFO org.apache.flink.yarn.ApplicationClient > - Sending StopCluster request to JobManager. > 2016-11-02 10:26:06,345 WARN org.apache.flink.yarn.YarnClusterClient > - Error while stopping YARN cluster. > java.util.concurrent.TimeoutException: Futures timed out after [10000 > milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) > at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.ready(package.scala:86) > at scala.concurrent.Await.ready(package.scala) > at > org.apache.flink.yarn.YarnClusterClient.shutdownCluster(YarnClusterClient.java:366) > at > org.apache.flink.yarn.YarnClusterClient.finalizeCluster(YarnClusterClient.java:336) > at > org.apache.flink.client.program.ClusterClient.shutdown(ClusterClient.java:206) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:260) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994) > at > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > 2016-11-02 10:26:06,347 INFO org.apache.flink.yarn.YarnClusterClient > - Deleting files in > hdfs://littleredns/user/A12345/.flink/application_1476277440022_40328 > 2016-11-02 10:26:06,530 INFO org.apache.flink.yarn.YarnClusterClient > - Application application_1476277440022_40328 finished with state FAILED and > final state FAILED at 1478100282775 > 2016-11-02 10:26:06,530 WARN org.apache.flink.yarn.YarnClusterClient > - Application failed. Diagnostics Application > application_1476277440022_40328 failed 1 times due to Attempt recovered > after RM restartAM Container for appattempt_1476277440022_40328_000001 > exited with exitCode: 243 > For more detailed output, check application tracking > page:http://d-3zkvk02.target.com:8088/cluster/app/application_1476277440022_40328Then, > click on links to logs of each attempt. > Diagnostics: Exception from container-launch. > Container id: container_e3066_1476277440022_40328_01_000001 > Exit code: 243 > Stack trace: ExitCodeException exitCode=243: > at org.apache.hadoop.util.Shell.runCommand(Shell.java:576) > at org.apache.hadoop.util.Shell.run(Shell.java:487) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753) > at > org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Shell output: main : command provided 1 > main : run as user is A12345 > main : requested yarn user is A12345 > > > Container exited with a non-zero exit code 243 > Failing this attempt. Failing the application. > 2016-11-02 10:26:06,531 WARN org.apache.flink.yarn.YarnClusterClient > - If log aggregation is activated in the Hadoop cluster, we recommend to > retrieve the full application log using this command: > yarn logs -appReport application_1476277440022_40328 > (It sometimes takes a few seconds until the logs are aggregated) > 2016-11-02 10:26:06,531 INFO org.apache.flink.yarn.YarnClusterClient > - YARN Client is shutting down > 2016-11-02 10:26:06,532 INFO org.apache.flink.yarn.ApplicationClient > - Stopped Application client. > 2016-11-02 10:26:06,533 INFO org.apache.flink.yarn.ApplicationClient > - Disconnect from JobManager null. > 2016-11-02 10:26:06,536 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting > down remote daemon. > 2016-11-02 10:26:06,537 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote > daemon shut down; proceeding with flushing remote transports. > 2016-11-02 10:26:06,558 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting > shut down. > > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi Maximilian,
Thanks for you response. Since, I'm running the application on YARN cluster using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' command. Is there anything more that I need to configure apart from setting up 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml. Just wished to confirm if there is anything more that I need to configure to set up HA on 'yarn-cluster' mode. Thank you Regards, Anchit |
Hi Anchit,
The documentation mentions that you need Zookeeper in addition to setting the application attempts. Zookeeper is needed to retrieve the current leader for the client and to filter out old leaders in case multiple exist (old processes could even stay alive in Yarn). Moreover, it is needed to persist the state of the application. -Max On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana <[hidden email]> wrote: > Hi Maximilian, > > Thanks for you response. Since, I'm running the application on YARN cluster > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' command. > Is there anything more that I need to configure apart from setting up > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml. > > Just wished to confirm if there is anything more that I need to configure to > set up HA on 'yarn-cluster' mode. > > Thank you > > Regards, > Anchit > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
I have a follow up question to this - if I'm running a job in 'yarn-cluster' mode with HA and then at some point the YARN application fails due to some hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED" state), how can I restore the job from the most recent checkpoint? I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to restore from a savepoint, but what if I haven't manually taken a savepoint recently? Thanks, Josh
On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]> wrote: Hi Anchit, |
No you don't need to manually trigger a savepoint. With HA checkpoints
are persisted externally and store a pointer in ZooKeeper to recover them after a JobManager failure. On Fri, Nov 4, 2016 at 2:27 PM, Josh <[hidden email]> wrote: > I have a follow up question to this - if I'm running a job in 'yarn-cluster' > mode with HA and then at some point the YARN application fails due to some > hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED" > state), how can I restore the job from the most recent checkpoint? > > I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to > restore from a savepoint, but what if I haven't manually taken a savepoint > recently? > > Thanks, > Josh > > On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Anchit, >> >> The documentation mentions that you need Zookeeper in addition to >> setting the application attempts. Zookeeper is needed to retrieve the >> current leader for the client and to filter out old leaders in case >> multiple exist (old processes could even stay alive in Yarn). Moreover, it >> is needed to persist the state of the application. >> >> >> -Max >> >> >> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana >> <[hidden email]> wrote: >> > Hi Maximilian, >> > >> > Thanks for you response. Since, I'm running the application on YARN >> > cluster >> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' >> > command. >> > Is there anything more that I need to configure apart from setting up >> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml. >> > >> > Just wished to confirm if there is anything more that I need to >> > configure to >> > set up HA on 'yarn-cluster' mode. >> > >> > Thank you >> > >> > Regards, >> > Anchit >> > >> > >> > >> > -- >> > View this message in context: >> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html >> > Sent from the Apache Flink User Mailing List archive. mailing list >> > archive at Nabble.com. > > |
Hi Ufuk, I see, but in my case the failure caused YARN application moved into a finished/failed state - so the application itself is no longer running. How can I restart the application (or start a new YARN application) and ensure that it uses the checkpoint pointer stored in Zookeeper? Thanks, Josh On Fri, Nov 4, 2016 at 1:52 PM, Ufuk Celebi <[hidden email]> wrote: No you don't need to manually trigger a savepoint. With HA checkpoints |
If the configured ZooKeeper paths are still the same, the job should
be recovered automatically. On each submission a unique ZK namespace is used based on the app ID. So you have in ZK: /flink/app_id/... You would have to set that manually to resume an old application. You can do this via -z flag (https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cli.html). Does this work? On Fri, Nov 4, 2016 at 3:28 PM, Josh <[hidden email]> wrote: > Hi Ufuk, > > I see, but in my case the failure caused YARN application moved into a > finished/failed state - so the application itself is no longer running. How > can I restart the application (or start a new YARN application) and ensure > that it uses the checkpoint pointer stored in Zookeeper? > > Thanks, > Josh > > On Fri, Nov 4, 2016 at 1:52 PM, Ufuk Celebi <[hidden email]> wrote: >> >> No you don't need to manually trigger a savepoint. With HA checkpoints >> are persisted externally and store a pointer in ZooKeeper to recover >> them after a JobManager failure. >> >> On Fri, Nov 4, 2016 at 2:27 PM, Josh <[hidden email]> wrote: >> > I have a follow up question to this - if I'm running a job in >> > 'yarn-cluster' >> > mode with HA and then at some point the YARN application fails due to >> > some >> > hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED" >> > state), how can I restore the job from the most recent checkpoint? >> > >> > I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .....` to >> > restore from a savepoint, but what if I haven't manually taken a >> > savepoint >> > recently? >> > >> > Thanks, >> > Josh >> > >> > On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <[hidden email]> >> > wrote: >> >> >> >> Hi Anchit, >> >> >> >> The documentation mentions that you need Zookeeper in addition to >> >> setting the application attempts. Zookeeper is needed to retrieve the >> >> current leader for the client and to filter out old leaders in case >> >> multiple exist (old processes could even stay alive in Yarn). Moreover, >> >> it >> >> is needed to persist the state of the application. >> >> >> >> >> >> -Max >> >> >> >> >> >> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana >> >> <[hidden email]> wrote: >> >> > Hi Maximilian, >> >> > >> >> > Thanks for you response. Since, I'm running the application on YARN >> >> > cluster >> >> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' >> >> > command. >> >> > Is there anything more that I need to configure apart from setting up >> >> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml. >> >> > >> >> > Just wished to confirm if there is anything more that I need to >> >> > configure to >> >> > set up HA on 'yarn-cluster' mode. >> >> > >> >> > Thank you >> >> > >> >> > Regards, >> >> > Anchit >> >> > >> >> > >> >> > >> >> > -- >> >> > View this message in context: >> >> > >> >> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html >> >> > Sent from the Apache Flink User Mailing List archive. mailing list >> >> > archive at Nabble.com. >> > >> > > > |
Thanks, I didn't know about the -z flag! I haven't been able to get it to work though (using yarn-cluster, with a zookeeper root configured to /flink in my flink-conf.yaml) I can see my job directory in ZK under /flink/application_1477475694024_0015 and I've tried a few ways to restore the job: ./bin/flink run -m yarn-cluster -yz /application_1477475694024_0015 .... ./bin/flink run -m yarn-cluster -yz application_1477475694024_0015 .... ./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015/ .... ./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015 .... The job starts from scratch each time, without restored state. Am I doing something wrong? I've also tried with -z instead of -yz but I'm using yarn-cluster to run a single job, so I think it should be -yz. On Fri, Nov 4, 2016 at 2:33 PM, Ufuk Celebi <[hidden email]> wrote: If the configured ZooKeeper paths are still the same, the job should |
On 4 November 2016 at 17:09:25, Josh ([hidden email]) wrote: > Thanks, I didn't know about the -z flag! > > I haven't been able to get it to work though (using yarn-cluster, with a > zookeeper root configured to /flink in my flink-conf.yaml) > > I can see my job directory in ZK under > /flink/application_1477475694024_0015 and I've tried a few ways to restore > the job: > > ./bin/flink run -m yarn-cluster -yz /application_1477475694024_0015 .... > ./bin/flink run -m yarn-cluster -yz application_1477475694024_0015 .... > ./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015/ > .... > ./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015 > .... > > The job starts from scratch each time, without restored state. > > Am I doing something wrong? I've also tried with -z instead of -yz but I'm > using yarn-cluster to run a single job, so I think it should be -yz. Can you please check the JobManager logs of the initial job that you want to resume and look for a line like this: Using ‘.../flink/application_...' as Zookeeper namespace. Now you need to set the part after 'flink/' as the namespace, probably "application_1477475694024_0015" (from your last message). The flag should be just -z. You can also set it in the Flink config file: high-availability.cluster-id: application_1477475694024_0015 Does this help? --- There is also a new feature in Flink 1.2 allowing you to persist every checkpoint externally. The feature is already in, but the configuration will be adjusted (https://github.com/apache/flink/pull/2752). Currently you can configure it by specifying a checkpoint directory manually via: state.checkpoints.dir: hdfs:///flink/checkpoints In the CheckpointConfig you enable it via CheckpoingConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); – Ufuk |
Free forum by Nabble | Edit this page |