Hi, I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes native as HA. The HA works well when either jobmanager or taskmanager pod lost or crashes. But, when I restart master node, jobmanager pod will always crash and restart. This results in the entire Flink cluster restart and most of taskmanager pod will restart as well.
I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug should be handle or there is some work around.
Below is my Flink setting Job-Manager flink-conf.yaml: ---- jobmanager.rpc.address: streakerflink-jobmanager high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.cluster-id: /streaker high-availability.jobmanager.port: 6123 high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink kubernetes.cluster-id: streaker rest.address: streakerflink-jobmanager rest.bind-port: 8081 rest.port: 8081 state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker blob.server.port: 6124 metrics.internal.query-service.port: 6125 metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 9999 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647 restart-strategy.fixed-delay.delay: 5 s jobmanager.memory.process.size: 1768m parallelism.default: 1 task.cancellation.timeout: 2000 web.log.path: /opt/flink/log/output.log jobmanager.web.log.path: /opt/flink/log/output.log web.submit.enable: false Task-Manager flink-conf.yaml: ---- jobmanager.rpc.address: streakerflink-jobmanager high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.cluster-id: /streaker high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink kubernetes.cluster-id: streaker taskmanager.network.bind-policy: ip taskmanager.data.port: 6121 taskmanager.rpc.port: 6122 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647 restart-strategy.fixed-delay.delay: 5 s taskmanager.memory.task.heap.size: 9728m taskmanager.memory.framework.off-heap.size: 512m taskmanager.memory.managed.size: 512m taskmanager.memory.jvm-metaspace.size: 256m taskmanager.memory.jvm-overhead.max: 3g taskmanager.memory.jvm-overhead.fraction: 0.035 taskmanager.memory.network.fraction: 0.03 taskmanager.memory.network.max: 3g taskmanager.numberOfTaskSlots: 1 taskmanager.jvm-exit-on-oom: true metrics.internal.query-service.port: 6125 metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 9999 web.log.path: /opt/flink/log/output.log taskmanager.log.path: /opt/flink/log/output.log task.cancellation.timeout: 2000 Any help will be appreciated! Thanks, Jerome |
By "restart master node", do you mean to restart the K8s master component(e.g. APIServer, ETCD, etc.)? Even though the master components are restarted, the Flink JobManager and TaskManager should eventually get to work. Could you please share the JobManager logs so that we could debug why it crashed. Best, Yang Jerome Li <[hidden email]> 于2021年5月25日周二 上午3:43写道:
|
Hi Yang, Thanks for getting back to me. By “restart master node”, I mean do “kubctl get nodes” to find the node’s role as master and “ssh” into one of master nodes as ubuntu user. Then run “sudo /sbin/reboot -f” to restart the master node. It looks like The JobManager would cancel the running job and log this after that. 2021-05-26 18:28:37,997 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph
- Discarding the results produced by task execution 34eb9f5009dc7cf07117e720e7d393de. 2021-05-26 18:28:37,999 [INFO] org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore - Suspending 2021-05-26 18:28:37,999 [INFO] org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter - Shutting down. 2021-05-26 18:28:38,000 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph
- Job 74fc5c858e50f5efc91db9ee16c17a8c has been suspended. 2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
- Suspending SlotPool. 2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.JobMaster
- Close ResourceManager connection 5bac86fb0b5c984ef429225b8de82cc0: JobManager is no longer the leader.. 2021-05-26 18:28:38,019 [INFO] org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl
- JobManager runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted leadership with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2. 2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing
is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing
is started. 2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing
is started. 2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing
is started. 2021-05-26 18:28:38,295 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,296 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
is started. 2021-05-26 18:28:38,296 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
- The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing
is started. 2021-05-26 18:28:38,299 [ERROR] org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job 74fc5c858e50f5efc91db9ee16c17a8c failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:887) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:426) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.2.jar:1.12.2] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.2.jar:1.12.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.2.jar:1.12.2] Caused by: org.apache.flink.util.FlinkException: Could not start the job manager. at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.lambda$handleException$7(JobManagerRunnerImpl.java:456) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?] at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.actor.ActorRef.tell(ActorRef.scala:126) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 21 more Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: DefaultLeaderRetrievalService can only be started once. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?] at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at akka.actor.ActorRef.tell(ActorRef.scala:126) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 21 more Caused by: java.lang.IllegalStateException: DefaultLeaderRetrievalService can only be started once. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService.start(DefaultLeaderRetrievalService.java:89) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.startJobMasterServices(JobMaster.java:891) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:864) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:381) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:419) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 21 more 2021-05-26 18:28:38,310 [INFO] org.apache.flink.runtime.blob.BlobServer
- Stopped BLOB server at 0.0.0.0:6124 Eventually, it gets back to work but sometime not. Some of the taskmanager not cannot identify the jobmanager address. I have to manually restart the staled taskmanager.
Is this the desired Flink behaviors? Or is it a bug? Or if I am missing something?
Best, Jerome From:
Yang Wang <[hidden email]> By "restart master node", do you mean to restart the K8s master component(e.g. APIServer, ETCD, etc.)? Even though the master components are restarted, the Flink JobManager and TaskManager should eventually get to work. Could you please share the JobManager logs so that we could debug why it crashed. Best, Yang Jerome Li <[hidden email]>
于2021年5月25日周二
上午3:43写道:
|
I think your attached exception has been fixed via FLINK-22597[1]. Could you please have a try with the latest version. Moreover, it is not the desired Flink behavior that TaskManager could not retrieve the new JobManager address and re-register successfully. I think you need to share the staled TaskManager logs so that we could move forward the debugging. Best, Yang Jerome Li <[hidden email]> 于2021年5月27日周四 上午4:54写道:
|
Free forum by Nabble | Edit this page |