HA on AWS EMR

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

HA on AWS EMR

Averell
Hi,

I'm trying to enable HA for my Flink jobs running on AWS EMR.
Following [1], I created a common Flink YARN session and submitting all my
jobs to that one. These 4 config params were added
/    high-availability = zookeeper
    high-availability.storageDir =  
    high-availability.zookepper.path.root = /flink
    high-availability.zookeeper.quorum = <EMR's master node's DNS name>:2181
/(The Zookeeper came with EMR was used)

The command to start that Flink YARN session is like this:
`/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm
FlinkCommonSession -z FlinkCommonSession -d/`

The first HA test - yarn application killed - went well. I killed that
common session by using `/yarn application --kill <appId>/` and created a
new session using the same command, then the jobs were restored
automatically after that session was up.

However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs are
not restored/ *after the common session was created on the new EMR cluster.
(attached  jobmanager.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz>
)

Should I expect that the jobs are restored in that scenario no.2 - EMR
cluster crashed.
Do I miss something here?

Thanks for your help.

Regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HA on AWS EMR

r_khachatryan
Hi,

Can you explain what "EMR cluster crashed" means in the 2nd scenario?
Can you also share:
- yarn.application-attempts in Flink
- yarn.resourcemanager.am.max-attempts in Yarn
- number of EMR master nodes (1 or 3)
- EMR version?

Regards,
Roman


On Mon, Oct 19, 2020 at 8:22 AM Averell <[hidden email]> wrote:
Hi,

I'm trying to enable HA for my Flink jobs running on AWS EMR.
Following [1], I created a common Flink YARN session and submitting all my
jobs to that one. These 4 config params were added
/    high-availability = zookeeper
    high-availability.storageDir = 
    high-availability.zookepper.path.root = /flink
    high-availability.zookeeper.quorum = <EMR's master node's DNS name>:2181
/(The Zookeeper came with EMR was used)

The command to start that Flink YARN session is like this:
`/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm
FlinkCommonSession -z FlinkCommonSession -d/`

The first HA test - yarn application killed - went well. I killed that
common session by using `/yarn application --kill <appId>/` and created a
new session using the same command, then the jobs were restored
automatically after that session was up.

However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs are
not restored/ *after the common session was created on the new EMR cluster.
(attached  jobmanager.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz>
)

Should I expect that the jobs are restored in that scenario no.2 - EMR
cluster crashed.
Do I miss something here?

Thanks for your help.

Regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HA on AWS EMR

Averell
Hello Roman,

Thanks for your time.
I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node.
/yarn.application-attempts/ is not set (does that means unlimited?), while
/yarn.resourcemanager.am.max-attempts/ is 4.

In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios
which could lead to this are:
  - The master node is down
  - The cluster is accidentally / deliberately terminated.

I found a thread in our mailing list [1], in which Fabian mentioned a
/"pointer"/ stored in Zookeeper. It looks like this piece of information is
stored in Zookeeper's dataDir, which is by default stored in the local
storage of the EMR's master node. I'm trying to move this one to an EFS, in
hope that it would help. Not sure whether this is a right approach.

Thanks for your help.
Regards,
Averell


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HA on AWS EMR

r_khachatryan
Hello Averell,

I don't think ZK data is stored on a master node. And Flink JM data is stored usually on DFS -  according to "high-availability.storageDir" [1]

In either case, for Flink to be HA, Yarn should also be HA. And I think this is not the case with a single master node. Please consider multi-master EMR setup [2].

On Tue, Oct 20, 2020 at 12:13 AM Averell <[hidden email]> wrote:
Hello Roman,

Thanks for your time.
I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node.
/yarn.application-attempts/ is not set (does that means unlimited?), while
/yarn.resourcemanager.am.max-attempts/ is 4.

In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios
which could lead to this are:
  - The master node is down
  - The cluster is accidentally / deliberately terminated.

I found a thread in our mailing list [1], in which Fabian mentioned a
/"pointer"/ stored in Zookeeper. It looks like this piece of information is
stored in Zookeeper's dataDir, which is by default stored in the local
storage of the EMR's master node. I'm trying to move this one to an EFS, in
hope that it would help. Not sure whether this is a right approach.

Thanks for your help.
Regards,
Averell


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HA on AWS EMR

Averell
Hello Roman,

Thanks for the answer.
I have already had that high-availability.storageDir configured to an S3
location. Our service is not critical enough, so to save the cost, we are
using the single-master EMR setup. I understand that we'll not get YARN HA
in that case, but what I expect here is the ability to quickly restore the
service in the case of failure. Without Zookeeper, when such failure
happens, I'll need to find the last checkpoint of each job and restore from
there. With the help of HA feature, I can just start a new
flink-yarn-session, then all jobs will be restored.

I tried to change zookeeper dataDir config to an EFS location which both the
old and new EMR clusters could access, and that worked.

However, now I have a new question: is it expectable to restore to a new
version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
tried and got some error messages attached below. Not sure that's a bug or
expected behaviour.

Thanks and best regards,
Averell

============
/07:39:33.906 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred
in the cluster entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
recovered job 6e5c12f1c352dd4e6200c40aebb65745.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate
JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: java.lang.NullPointerException
        at
java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1028)
~[?:1.8.0_265]
        at
java.util.Collections$UnmodifiableList.<init>(Collections.java:1304)
~[?:1.8.0_265]
        at java.util.Collections.unmodifiableList(Collections.java:1289)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:232)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HA on AWS EMR

rmetzger0
Hey Averell,

you should be able to migrate savepoints from Flink 1.10 to 1.11.

Is there a simple way for me to reproduce this issue locally? This seems to be a rare, but probably valid issue. Are you using any special operators? (like the new source API?)

Best,
Robert

On Wed, Oct 21, 2020 at 11:07 AM Averell <[hidden email]> wrote:
Hello Roman,

Thanks for the answer.
I have already had that high-availability.storageDir configured to an S3
location. Our service is not critical enough, so to save the cost, we are
using the single-master EMR setup. I understand that we'll not get YARN HA
in that case, but what I expect here is the ability to quickly restore the
service in the case of failure. Without Zookeeper, when such failure
happens, I'll need to find the last checkpoint of each job and restore from
there. With the help of HA feature, I can just start a new
flink-yarn-session, then all jobs will be restored.

I tried to change zookeeper dataDir config to an EFS location which both the
old and new EMR clusters could access, and that worked.

However, now I have a new question: is it expectable to restore to a new
version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
tried and got some error messages attached below. Not sure that's a bug or
expected behaviour.

Thanks and best regards,
Averell

============
/07:39:33.906 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred
in the cluster entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
recovered job 6e5c12f1c352dd4e6200c40aebb65745.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate
JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: java.lang.NullPointerException
        at
java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1028)
~[?:1.8.0_265]
        at
java.util.Collections$UnmodifiableList.<init>(Collections.java:1304)
~[?:1.8.0_265]
        at java.util.Collections.unmodifiableList(Collections.java:1289)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:232)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HA on AWS EMR

rmetzger0
Hey Averell,

to clarify: You should be able to migrate using a savepoint from 1.10 to 1.11. Restoring from the state stored in Zookeeper (for HA) with a newer Flink version won't work.

On Mon, Oct 26, 2020 at 5:05 PM Robert Metzger <[hidden email]> wrote:
Hey Averell,

you should be able to migrate savepoints from Flink 1.10 to 1.11.

Is there a simple way for me to reproduce this issue locally? This seems to be a rare, but probably valid issue. Are you using any special operators? (like the new source API?)

Best,
Robert

On Wed, Oct 21, 2020 at 11:07 AM Averell <[hidden email]> wrote:
Hello Roman,

Thanks for the answer.
I have already had that high-availability.storageDir configured to an S3
location. Our service is not critical enough, so to save the cost, we are
using the single-master EMR setup. I understand that we'll not get YARN HA
in that case, but what I expect here is the ability to quickly restore the
service in the case of failure. Without Zookeeper, when such failure
happens, I'll need to find the last checkpoint of each job and restore from
there. With the help of HA feature, I can just start a new
flink-yarn-session, then all jobs will be restored.

I tried to change zookeeper dataDir config to an EFS location which both the
old and new EMR clusters could access, and that worked.

However, now I have a new question: is it expectable to restore to a new
version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
tried and got some error messages attached below. Not sure that's a bug or
expected behaviour.

Thanks and best regards,
Averell

============
/07:39:33.906 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred
in the cluster entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
recovered job 6e5c12f1c352dd4e6200c40aebb65745.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate
JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: java.lang.NullPointerException
        at
java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1028)
~[?:1.8.0_265]
        at
java.util.Collections$UnmodifiableList.<init>(Collections.java:1304)
~[?:1.8.0_265]
        at java.util.Collections.unmodifiableList(Collections.java:1289)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:232)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HA on AWS EMR

Averell
Hello Robert,

Thanks for the info. That makes sense. I will save and cancel my jobs with
1.10, upgrade to 1.11, and restore the jobs from the savepoints.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/