Hi,
I'm trying to enable HA for my Flink jobs running on AWS EMR. Following [1], I created a common Flink YARN session and submitting all my jobs to that one. These 4 config params were added / high-availability = zookeeper high-availability.storageDir = high-availability.zookepper.path.root = /flink high-availability.zookeeper.quorum = <EMR's master node's DNS name>:2181 /(The Zookeeper came with EMR was used) The command to start that Flink YARN session is like this: `/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm FlinkCommonSession -z FlinkCommonSession -d/` The first HA test - yarn application killed - went well. I killed that common session by using `/yarn application --kill <appId>/` and created a new session using the same command, then the jobs were restored automatically after that session was up. However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs are not restored/ *after the common session was created on the new EMR cluster. (attached jobmanager.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz> ) Should I expect that the jobs are restored in that scenario no.2 - EMR cluster crashed. Do I miss something here? Thanks for your help. Regards, Averell [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Can you explain what "EMR cluster crashed" means in the 2nd scenario? Can you also share: - yarn.application-attempts in Flink - yarn.resourcemanager.am.max-attempts in Yarn - number of EMR master nodes (1 or 3) - EMR version? Regards,
Roman On Mon, Oct 19, 2020 at 8:22 AM Averell <[hidden email]> wrote: Hi, |
Hello Roman,
Thanks for your time. I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node. /yarn.application-attempts/ is not set (does that means unlimited?), while /yarn.resourcemanager.am.max-attempts/ is 4. In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios which could lead to this are: - The master node is down - The cluster is accidentally / deliberately terminated. I found a thread in our mailing list [1], in which Fabian mentioned a /"pointer"/ stored in Zookeeper. It looks like this piece of information is stored in Zookeeper's dataDir, which is by default stored in the local storage of the EMR's master node. I'm trying to move this one to an EFS, in hope that it would help. Not sure whether this is a right approach. Thanks for your help. Regards, Averell [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hello Averell, I don't think ZK data is stored on a master node. And Flink JM data is stored usually on DFS - according to "high-availability.storageDir" [1] In either case, for Flink to be HA, Yarn should also be HA. And I think this is not the case with a single master node. Please consider multi-master EMR setup [2]. On Tue, Oct 20, 2020 at 12:13 AM Averell <[hidden email]> wrote: Hello Roman, |
Hello Roman,
Thanks for the answer. I have already had that high-availability.storageDir configured to an S3 location. Our service is not critical enough, so to save the cost, we are using the single-master EMR setup. I understand that we'll not get YARN HA in that case, but what I expect here is the ability to quickly restore the service in the case of failure. Without Zookeeper, when such failure happens, I'll need to find the last checkpoint of each job and restore from there. With the help of HA feature, I can just start a new flink-yarn-session, then all jobs will be restored. I tried to change zookeeper dataDir config to an EFS location which both the old and new EMR clusters could access, and that worked. However, now I have a new question: is it expectable to restore to a new version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I tried and got some error messages attached below. Not sure that's a bug or expected behaviour. Thanks and best regards, Averell ============ /07:39:33.906 [main-EventThread] ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed 07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred in the cluster entrypoint. org.apache.flink.runtime.dispatcher.DispatcherException: Could not start recovered job 6e5c12f1c352dd4e6200c40aebb65745. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_265] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_265] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_265] at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) ~[?:1.8.0_265] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753) ~[?:1.8.0_265] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_265] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.0.jar:1.11.0] Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_265] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.11-1.11.0.jar:1.11.0] ... 4 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_265] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.11-1.11.0.jar:1.11.0] ... 4 more Caused by: java.lang.NullPointerException at java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1028) ~[?:1.8.0_265] at java.util.Collections$UnmodifiableList.<init>(Collections.java:1304) ~[?:1.8.0_265] at java.util.Collections.unmodifiableList(Collections.java:1289) ~[?:1.8.0_265] at org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:232) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_265] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.11-1.11.0.jar:1.11.0] / -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hey Averell, you should be able to migrate savepoints from Flink 1.10 to 1.11. Is there a simple way for me to reproduce this issue locally? This seems to be a rare, but probably valid issue. Are you using any special operators? (like the new source API?) Best, Robert On Wed, Oct 21, 2020 at 11:07 AM Averell <[hidden email]> wrote: Hello Roman, |
Hey Averell, to clarify: You should be able to migrate using a savepoint from 1.10 to 1.11. Restoring from the state stored in Zookeeper (for HA) with a newer Flink version won't work. On Mon, Oct 26, 2020 at 5:05 PM Robert Metzger <[hidden email]> wrote:
|
Hello Robert,
Thanks for the info. That makes sense. I will save and cancel my jobs with 1.10, upgrade to 1.11, and restore the jobs from the savepoints. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |