Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

Jerome Li

Hi,

 

I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes native as HA.

 

The HA works well when either jobmanager or taskmanager pod lost or crashes.

 

But, when I restart master node, jobmanager pod will always crash and restart. This results in the entire Flink cluster restart and most of taskmanager pod will restart as well.

 

I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug should be handle or there is some work around.

 

 

Below is my Flink setting

Job-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.jobmanager.port: 6123

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

rest.address: streakerflink-jobmanager

rest.bind-port: 8081

rest.port: 8081

 

state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker

 

blob.server.port: 6124

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

jobmanager.memory.process.size: 1768m

 

parallelism.default: 1

 

task.cancellation.timeout: 2000

 

web.log.path: /opt/flink/log/output.log

jobmanager.web.log.path: /opt/flink/log/output.log

 

web.submit.enable: false

 

Task-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

taskmanager.network.bind-policy: ip

 

taskmanager.data.port: 6121

taskmanager.rpc.port: 6122

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

taskmanager.memory.task.heap.size: 9728m

taskmanager.memory.framework.off-heap.size: 512m

taskmanager.memory.managed.size: 512m

taskmanager.memory.jvm-metaspace.size: 256m

taskmanager.memory.jvm-overhead.max: 3g

taskmanager.memory.jvm-overhead.fraction: 0.035

taskmanager.memory.network.fraction: 0.03

taskmanager.memory.network.max: 3g

taskmanager.numberOfTaskSlots: 1

 

taskmanager.jvm-exit-on-oom: true

 

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

web.log.path: /opt/flink/log/output.log

taskmanager.log.path: /opt/flink/log/output.log

 

task.cancellation.timeout: 2000

 

Any help will be appreciated!

 

Thanks,

Jerome

Reply | Threaded
Open this post in threaded view
|

Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

Yang Wang
By "restart master node", do you mean to restart the K8s master component(e.g. APIServer, ETCD, etc.)?

Even though the master components are restarted, the Flink JobManager and TaskManager should eventually get to work.
Could you please share the JobManager logs so that we could debug why it crashed.


Best,
Yang

Jerome Li <[hidden email]> 于2021年5月25日周二 上午3:43写道:

Hi,

 

I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes native as HA.

 

The HA works well when either jobmanager or taskmanager pod lost or crashes.

 

But, when I restart master node, jobmanager pod will always crash and restart. This results in the entire Flink cluster restart and most of taskmanager pod will restart as well.

 

I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug should be handle or there is some work around.

 

 

Below is my Flink setting

Job-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.jobmanager.port: 6123

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

rest.address: streakerflink-jobmanager

rest.bind-port: 8081

rest.port: 8081

 

state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker

 

blob.server.port: 6124

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

jobmanager.memory.process.size: 1768m

 

parallelism.default: 1

 

task.cancellation.timeout: 2000

 

web.log.path: /opt/flink/log/output.log

jobmanager.web.log.path: /opt/flink/log/output.log

 

web.submit.enable: false

 

Task-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

taskmanager.network.bind-policy: ip

 

taskmanager.data.port: 6121

taskmanager.rpc.port: 6122

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

taskmanager.memory.task.heap.size: 9728m

taskmanager.memory.framework.off-heap.size: 512m

taskmanager.memory.managed.size: 512m

taskmanager.memory.jvm-metaspace.size: 256m

taskmanager.memory.jvm-overhead.max: 3g

taskmanager.memory.jvm-overhead.fraction: 0.035

taskmanager.memory.network.fraction: 0.03

taskmanager.memory.network.max: 3g

taskmanager.numberOfTaskSlots: 1

 

taskmanager.jvm-exit-on-oom: true

 

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

web.log.path: /opt/flink/log/output.log

taskmanager.log.path: /opt/flink/log/output.log

 

task.cancellation.timeout: 2000

 

Any help will be appreciated!

 

Thanks,

Jerome

Reply | Threaded
Open this post in threaded view
|

Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

Jerome Li

Hi Yang,

 

Thanks for getting back to me.

 

By “restart master node”, I mean do “kubctl get nodes” to find the node’s role as master and “ssh” into one of master nodes as ubuntu user. Then run “sudo /sbin/reboot -f” to restart the master node.  

 

It looks like The JobManager would cancel the running job and log this after that.

2021-05-26 18:28:37,997 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph       - Discarding the results produced by task execution 34eb9f5009dc7cf07117e720e7d393de.

2021-05-26 18:28:37,999 [INFO] org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore - Suspending

2021-05-26 18:28:37,999 [INFO] org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter - Shutting down.

2021-05-26 18:28:38,000 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph       - Job 74fc5c858e50f5efc91db9ee16c17a8c has been suspended.

2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     - Suspending SlotPool.

2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.JobMaster                 - Close ResourceManager connection 5bac86fb0b5c984ef429225b8de82cc0: JobManager is no longer the leader..

2021-05-26 18:28:38,019 [INFO] org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl      - JobManager runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted leadership with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2.

2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,296 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,296 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,299 [ERROR] org.apache.flink.runtime.entrypoint.ClusterEntrypoint        - Fatal error occurred in the cluster entrypoint.

org.apache.flink.util.FlinkException: JobMaster for job 74fc5c858e50f5efc91db9ee16c17a8c failed.

       at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:887) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:426) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]

       at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]

       at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.2.jar:1.12.2]

Caused by: org.apache.flink.util.FlinkException: Could not start the job manager.

       at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.lambda$handleException$7(JobManagerRunnerImpl.java:456) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]

       at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]

       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]

       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]

       at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorRef.tell(ActorRef.scala:126) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       ... 21 more

Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: DefaultLeaderRetrievalService can only be started once.

       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]

       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]

       at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]

       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]

       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]

       at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorRef.tell(ActorRef.scala:126) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       ... 21 more

Caused by: java.lang.IllegalStateException: DefaultLeaderRetrievalService can only be started once.

       at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService.start(DefaultLeaderRetrievalService.java:89) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.jobmaster.JobMaster.startJobMasterServices(JobMaster.java:891) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:864) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:381) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:419) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       ... 21 more

2021-05-26 18:28:38,310 [INFO] org.apache.flink.runtime.blob.BlobServer                     - Stopped BLOB server at 0.0.0.0:6124

 

Eventually, it gets back to work but sometime not. Some of the taskmanager not cannot identify the jobmanager address. I have to manually restart the staled taskmanager.

 

Is this the desired Flink behaviors? Or is it a bug? Or if I am missing something?

 

Best,

Jerome

 

 

From: Yang Wang <[hidden email]>
Date: Tuesday, May 25, 2021 at 1:03 AM
To: Jerome Li <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

By "restart master node", do you mean to restart the K8s master component(e.g. APIServer, ETCD, etc.)?

 

Even though the master components are restarted, the Flink JobManager and TaskManager should eventually get to work.

Could you please share the JobManager logs so that we could debug why it crashed.

 

 

Best,

Yang

 

Jerome Li <[hidden email]> 2021525日周二 上午3:43写道:

Hi,

 

I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes native as HA.

 

The HA works well when either jobmanager or taskmanager pod lost or crashes.

 

But, when I restart master node, jobmanager pod will always crash and restart. This results in the entire Flink cluster restart and most of taskmanager pod will restart as well.

 

I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug should be handle or there is some work around.

 

 

Below is my Flink setting

Job-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.jobmanager.port: 6123

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

rest.address: streakerflink-jobmanager

rest.bind-port: 8081

rest.port: 8081

 

state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker

 

blob.server.port: 6124

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

jobmanager.memory.process.size: 1768m

 

parallelism.default: 1

 

task.cancellation.timeout: 2000

 

web.log.path: /opt/flink/log/output.log

jobmanager.web.log.path: /opt/flink/log/output.log

 

web.submit.enable: false

 

Task-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

taskmanager.network.bind-policy: ip

 

taskmanager.data.port: 6121

taskmanager.rpc.port: 6122

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

taskmanager.memory.task.heap.size: 9728m

taskmanager.memory.framework.off-heap.size: 512m

taskmanager.memory.managed.size: 512m

taskmanager.memory.jvm-metaspace.size: 256m

taskmanager.memory.jvm-overhead.max: 3g

taskmanager.memory.jvm-overhead.fraction: 0.035

taskmanager.memory.network.fraction: 0.03

taskmanager.memory.network.max: 3g

taskmanager.numberOfTaskSlots: 1

 

taskmanager.jvm-exit-on-oom: true

 

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

web.log.path: /opt/flink/log/output.log

taskmanager.log.path: /opt/flink/log/output.log

 

task.cancellation.timeout: 2000

 

Any help will be appreciated!

 

Thanks,

Jerome

Reply | Threaded
Open this post in threaded view
|

Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

Yang Wang
I think your attached exception has been fixed via FLINK-22597[1]. Could you please have a try with the latest version.

Moreover, it is not the desired Flink behavior that TaskManager could not retrieve the new JobManager address and re-register successfully. I think you need to share
the staled TaskManager logs so that we could move forward the debugging.

Best,
Yang

Jerome Li <[hidden email]> 于2021年5月27日周四 上午4:54写道:

Hi Yang,

 

Thanks for getting back to me.

 

By “restart master node”, I mean do “kubctl get nodes” to find the node’s role as master and “ssh” into one of master nodes as ubuntu user. Then run “sudo /sbin/reboot -f” to restart the master node.  

 

It looks like The JobManager would cancel the running job and log this after that.

2021-05-26 18:28:37,997 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph       - Discarding the results produced by task execution 34eb9f5009dc7cf07117e720e7d393de.

2021-05-26 18:28:37,999 [INFO] org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore - Suspending

2021-05-26 18:28:37,999 [INFO] org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter - Shutting down.

2021-05-26 18:28:38,000 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph       - Job 74fc5c858e50f5efc91db9ee16c17a8c has been suspended.

2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     - Suspending SlotPool.

2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.JobMaster                 - Close ResourceManager connection 5bac86fb0b5c984ef429225b8de82cc0: JobManager is no longer the leader..

2021-05-26 18:28:38,019 [INFO] org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl      - JobManager runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted leadership with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2.

2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,296 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started.

2021-05-26 18:28:38,296 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing is started.

2021-05-26 18:28:38,299 [ERROR] org.apache.flink.runtime.entrypoint.ClusterEntrypoint        - Fatal error occurred in the cluster entrypoint.

org.apache.flink.util.FlinkException: JobMaster for job 74fc5c858e50f5efc91db9ee16c17a8c failed.

       at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:887) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:426) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]

       at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]

       at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.2.jar:1.12.2]

Caused by: org.apache.flink.util.FlinkException: Could not start the job manager.

       at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.lambda$handleException$7(JobManagerRunnerImpl.java:456) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]

       at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]

       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]

       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]

       at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorRef.tell(ActorRef.scala:126) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       ... 21 more

Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: DefaultLeaderRetrievalService can only be started once.

       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]

       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]

       at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]

       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]

       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]

       at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at akka.actor.ActorRef.tell(ActorRef.scala:126) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       ... 21 more

Caused by: java.lang.IllegalStateException: DefaultLeaderRetrievalService can only be started once.

       at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService.start(DefaultLeaderRetrievalService.java:89) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.jobmaster.JobMaster.startJobMasterServices(JobMaster.java:891) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:864) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:381) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:419) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2]

       ... 21 more

2021-05-26 18:28:38,310 [INFO] org.apache.flink.runtime.blob.BlobServer                     - Stopped BLOB server at 0.0.0.0:6124

 

Eventually, it gets back to work but sometime not. Some of the taskmanager not cannot identify the jobmanager address. I have to manually restart the staled taskmanager.

 

Is this the desired Flink behaviors? Or is it a bug? Or if I am missing something?

 

Best,

Jerome

 

 

From: Yang Wang <[hidden email]>
Date: Tuesday, May 25, 2021 at 1:03 AM
To: Jerome Li <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

By "restart master node", do you mean to restart the K8s master component(e.g. APIServer, ETCD, etc.)?

 

Even though the master components are restarted, the Flink JobManager and TaskManager should eventually get to work.

Could you please share the JobManager logs so that we could debug why it crashed.

 

 

Best,

Yang

 

Jerome Li <[hidden email]> 2021525日周二 上午3:43写道:

Hi,

 

I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes native as HA.

 

The HA works well when either jobmanager or taskmanager pod lost or crashes.

 

But, when I restart master node, jobmanager pod will always crash and restart. This results in the entire Flink cluster restart and most of taskmanager pod will restart as well.

 

I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug should be handle or there is some work around.

 

 

Below is my Flink setting

Job-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.jobmanager.port: 6123

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

rest.address: streakerflink-jobmanager

rest.bind-port: 8081

rest.port: 8081

 

state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker

 

blob.server.port: 6124

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

jobmanager.memory.process.size: 1768m

 

parallelism.default: 1

 

task.cancellation.timeout: 2000

 

web.log.path: /opt/flink/log/output.log

jobmanager.web.log.path: /opt/flink/log/output.log

 

web.submit.enable: false

 

Task-Manager

flink-conf.yaml:

----

jobmanager.rpc.address: streakerflink-jobmanager

 

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker

 

taskmanager.network.bind-policy: ip

 

taskmanager.data.port: 6121

taskmanager.rpc.port: 6122

 

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s

 

taskmanager.memory.task.heap.size: 9728m

taskmanager.memory.framework.off-heap.size: 512m

taskmanager.memory.managed.size: 512m

taskmanager.memory.jvm-metaspace.size: 256m

taskmanager.memory.jvm-overhead.max: 3g

taskmanager.memory.jvm-overhead.fraction: 0.035

taskmanager.memory.network.fraction: 0.03

taskmanager.memory.network.max: 3g

taskmanager.numberOfTaskSlots: 1

 

taskmanager.jvm-exit-on-oom: true

 

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9999

 

web.log.path: /opt/flink/log/output.log

taskmanager.log.path: /opt/flink/log/output.log

 

task.cancellation.timeout: 2000

 

Any help will be appreciated!

 

Thanks,

Jerome