Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Alexey Trenikhun
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://[hidden email]/gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://[hidden email]/gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://[hidden email]/gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Yang Wang
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Alexey Trenikhun
Hi Yang,
Unfortunately I didn't save log. Trying to reproduce again, but now hitting different error - about incompatible version of ImmutableMapSerializer, which is strange, since while serialVersionUID indeed changed, however this serializer is only registered but not used, (there is no state using Kryo, I'm calling disableGenericTypes to ensure this), could be that when I call registerTypeWithKryoSerializer, the serializer become part of JobGraph? If that so, then perhaps it same root cause - new JobGraph is not created. 

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey

job-graph-serialVersionUID.log (67K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Alexey Trenikhun
In reply to this post by Yang Wang
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey

jm-wrong-savepoint.log (73K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Yang Wang
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Alexey Trenikhun
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is deleting Kubernetes Job and HA configmaps enough, or something in persisted storage should be deleted as well?

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Yang Wang
If the HA related ConfigMaps still exists, then I am afraid the data located on the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月12日周五 上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is deleting Kubernetes Job and HA configmaps enough, or something in persisted storage should be deleted as well?

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Alexey Trenikhun
With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I saw it once or twice for ~20 cancels, when it happened, job actually restarted on cancel, did not grab log at that time, but chances good that I will able to reproduce. 
Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Sunday, March 14, 2021 7:50:21 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
If the HA related ConfigMaps still exists, then I am afraid the data located on the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月12日周五 上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is deleting Kubernetes Job and HA configmaps enough, or something in persisted storage should be deleted as well?

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Yang Wang
Feel free to share the terminated JobManager logs if you could reproduce this issue again.
Maybe "kubectl logs {pod_name} --previous" could help.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月15日周一 下午2:28写道:
With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I saw it once or twice for ~20 cancels, when it happened, job actually restarted on cancel, did not grab log at that time, but chances good that I will able to reproduce. 
Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Sunday, March 14, 2021 7:50:21 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
If the HA related ConfigMaps still exists, then I am afraid the data located on the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月12日周五 上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is deleting Kubernetes Job and HA configmaps enough, or something in persisted storage should be deleted as well?

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Alexey Trenikhun
Hi Yang,
I think I've reproduced the problem - job was cancelled, however something went wrong, JM exited with code 2, and as result Kubernetes restarted JM pod. JM log is attached.

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Monday, March 15, 2021 1:26 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Feel free to share the terminated JobManager logs if you could reproduce this issue again.
Maybe "kubectl logs {pod_name} --previous" could help.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月15日周一 下午2:28写道:
With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I saw it once or twice for ~20 cancels, when it happened, job actually restarted on cancel, did not grab log at that time, but chances good that I will able to reproduce. 
Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Sunday, March 14, 2021 7:50:21 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
If the HA related ConfigMaps still exists, then I am afraid the data located on the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月12日周五 上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is deleting Kubernetes Job and HA configmaps enough, or something in persisted storage should be deleted as well?

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey

jm-exit-code2.log (618K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Yang Wang
Hi Alexey,

From your attached logs, I do not think the new start JobManager will recover from the wrong savepoint.
Because you could find the following logs to indicate that the HA related ConfigMaps have been cleaned up successfully.

1393 {"ts":"2021-03-20T02:02:18.506Z","message":"Finished cleaning up the high availability data.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesHaServices","thread_name":"AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1","level":"INFO","level_value":20000}


So, now your question is why the JobManager pod exited with code 2?
It seems that "clusterEntrypoint.getTerminationFuture().get()" failed with a timeout when terminating.

Alexey Trenikhun <[hidden email]> 于2021年3月20日周六 上午10:16写道:
Hi Yang,
I think I've reproduced the problem - job was cancelled, however something went wrong, JM exited with code 2, and as result Kubernetes restarted JM pod. JM log is attached.

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Monday, March 15, 2021 1:26 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Feel free to share the terminated JobManager logs if you could reproduce this issue again.
Maybe "kubectl logs {pod_name} --previous" could help.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月15日周一 下午2:28写道:
With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I saw it once or twice for ~20 cancels, when it happened, job actually restarted on cancel, did not grab log at that time, but chances good that I will able to reproduce. 
Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Sunday, March 14, 2021 7:50:21 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
If the HA related ConfigMaps still exists, then I am afraid the data located on the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月12日周五 上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is deleting Kubernetes Job and HA configmaps enough, or something in persisted storage should be deleted as well?

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

Alexey Trenikhun
Hi Yang,
HA data was cleared, but it would be re-created  when Kubernetes will restart failed (due to code 2) job. So upgrade will happen on life job. I guess upgrade procedure, should recheck or monitor kubernetes job to ensure that it is completed

Thanks,
Alexey 


From: Yang Wang <[hidden email]>
Sent: Tuesday, March 23, 2021 11:17:18 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, I do not think the new start JobManager will recover from the wrong savepoint.
Because you could find the following logs to indicate that the HA related ConfigMaps have been cleaned up successfully.

1393 {"ts":"2021-03-20T02:02:18.506Z","message":"Finished cleaning up the high availability data.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesHaServices","thread_name":"AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1","level":"INFO","level_value":20000}


So, now your question is why the JobManager pod exited with code 2?
It seems that "clusterEntrypoint.getTerminationFuture().get()" failed with a timeout when terminating.

Alexey Trenikhun <[hidden email]> 于2021年3月20日周六 上午10:16写道:
Hi Yang,
I think I've reproduced the problem - job was cancelled, however something went wrong, JM exited with code 2, and as result Kubernetes restarted JM pod. JM log is attached.

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Monday, March 15, 2021 1:26 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Feel free to share the terminated JobManager logs if you could reproduce this issue again.
Maybe "kubectl logs {pod_name} --previous" could help.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月15日周一 下午2:28写道:
With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I saw it once or twice for ~20 cancels, when it happened, job actually restarted on cancel, did not grab log at that time, but chances good that I will able to reproduce. 
Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Sunday, March 14, 2021 7:50:21 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
If the HA related ConfigMaps still exists, then I am afraid the data located on the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月12日周五 上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but perhaps I hit FLINK-21028. This lead to question, if normal via API take-savepoint-and-cancel-job fails, what steps should be done outside Flink to be able to resume from savepoint with new job version? Is deleting Kubernetes Job and HA configmaps enough, or something in persisted storage should be deleted as well?

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job with savepoint
instead of directly deleting all the K8s resources. After then, you will find that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <[hidden email]> 于2021年3月10日周三 下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint
 
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang 

Alexey Trenikhun <[hidden email]> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is deployed as Job, single TM as StatefulSet). We taken savepoint with cancel=true. Now when we are trying to start job using --fromSavepoint A, where is A path we got from taking savepoint (ClusterEntrypoint reports A in log), but looks like Job for some reason ignores given A and actually trying to restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000} 
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 00000000000000000000000000000000 from savepoint wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685 (allowing non restored state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 'wasbs://gsp-state@.../gsp/savepoints/savepoint-000000-fbcd58f66685' on file system 'wasbs'.\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t... 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey