Flink not restoring from checkpoint when job manager fails even with HA

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink not restoring from checkpoint when job manager fails even with HA

Kathula, Sandeep

Hi,

    We are running Flink in K8S. We used https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html    to set high availability. We set max number of retries for a task to 2. After task fails twice and then the job manager fails. This is expected. But it is removing checkpoint from the zookeeper. As a result on the restart it is not consuming from the previous checkpoint. We are losing the data.

 

Logs:

 

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 00000000000000000000000000000000.

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - Shutting down

2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing /flink/sessionization_test4/checkpoints/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint with ID 11 at 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' not discarded.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Shutting down.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Removing /checkpoint-counter/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job 00000000000000000000000000000000 reached globally terminal state FAILED.

2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job sppstandardresourcemanager-flink-0606193838-6d7dae7e(00000000000000000000000000000000).

2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Shutting StandaloneJobClusterEntryPoint down with application status FAILED. Diagnostics null.

2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shutting down rest endpoint.

2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Suspending SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager is shutting down..

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect job manager [hidden email]://flink@flink-job-cluster:6123/user/jobmanager_0 for job 00000000000000000000000000000000 from the resource manager.

2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.

2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.

2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut down complete.

2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut down cluster because application is in FAILED, diagnostics null.

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.

2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator - Shutting down stack trace sample coordinator.

2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.

2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.975 INFO  o.a.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:6124

2020/06/06 19:39:07.975 INFO  o.a.f.r.h.z.ZooKeeperHaServices - Close and clean up all data for ZooKeeperHaServices.

2020/06/06 19:39:08.085 INFO  o.a.f.s.c.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - EventThread shut down for session: 0x17282452e8c0823

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ZooKeeper - Session: 0x17282452e8c0823 closed

2020/06/06 19:39:08.091 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.093 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.096 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.097 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.108 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.114 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.123 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopped Akka RPC service.

2020/06/06 19:39:08.124 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.

 

 

 

 

 

 

 

 

 

Can you please help?

 

Thanks

Sandeep Kathula

Reply | Threaded
Open this post in threaded view
|

Re: Flink not restoring from checkpoint when job manager fails even with HA

Yun Tang
Hi Sandeep

In general, Flink assign unique job-id to each job and use that id as the zk path. Thus when the checkpoint store shuts down with globally terminal state (e.g. FAILED, CANCELLED), it needs to clean paths in ZK to ensure no resource leak as the next job would have different job-id.

I think you just assign special job-id '00000000000000000000000000000000' for easy to restore, and the ZK path is just deleted as expected, and the externalized checkpoint path 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' actually not be discarded. If you want resume from previous job, you should use -s command to resume from retained checkpoint. [1]



Best
Yun Tang

From: Kathula, Sandeep <[hidden email]>
Sent: Sunday, June 7, 2020 4:27
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Flink not restoring from checkpoint when job manager fails even with HA
 

Hi,

    We are running Flink in K8S. We used https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html    to set high availability. We set max number of retries for a task to 2. After task fails twice and then the job manager fails. This is expected. But it is removing checkpoint from the zookeeper. As a result on the restart it is not consuming from the previous checkpoint. We are losing the data.

 

Logs:

 

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 00000000000000000000000000000000.

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - Shutting down

2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing /flink/sessionization_test4/checkpoints/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint with ID 11 at 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' not discarded.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Shutting down.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Removing /checkpoint-counter/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job 00000000000000000000000000000000 reached globally terminal state FAILED.

2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job sppstandardresourcemanager-flink-0606193838-6d7dae7e(00000000000000000000000000000000).

2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Shutting StandaloneJobClusterEntryPoint down with application status FAILED. Diagnostics null.

2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shutting down rest endpoint.

2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Suspending SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager is shutting down..

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect job manager [hidden email]://flink@flink-job-cluster:6123/user/jobmanager_0 for job 00000000000000000000000000000000 from the resource manager.

2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.

2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.

2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut down complete.

2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut down cluster because application is in FAILED, diagnostics null.

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.

2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator - Shutting down stack trace sample coordinator.

2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.

2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.975 INFO  o.a.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:6124

2020/06/06 19:39:07.975 INFO  o.a.f.r.h.z.ZooKeeperHaServices - Close and clean up all data for ZooKeeperHaServices.

2020/06/06 19:39:08.085 INFO  o.a.f.s.c.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - EventThread shut down for session: 0x17282452e8c0823

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ZooKeeper - Session: 0x17282452e8c0823 closed

2020/06/06 19:39:08.091 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.093 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.096 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.097 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.108 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.114 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.123 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopped Akka RPC service.

2020/06/06 19:39:08.124 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.

 

 

 

 

 

 

 

 

 

Can you please help?

 

Thanks

Sandeep Kathula

Reply | Threaded
Open this post in threaded view
|

Re: Flink not restoring from checkpoint when job manager fails even with HA

Vijay Bhaskar
Hi Yun

If we start using the special Job ID and redeploy the job, then after deployment, will it going to get assigned with special Job ID? or new Job ID?

Regards
Bhaskar

On Mon, Jun 8, 2020 at 9:33 AM Yun Tang <[hidden email]> wrote:
Hi Sandeep

In general, Flink assign unique job-id to each job and use that id as the zk path. Thus when the checkpoint store shuts down with globally terminal state (e.g. FAILED, CANCELLED), it needs to clean paths in ZK to ensure no resource leak as the next job would have different job-id.

I think you just assign special job-id '00000000000000000000000000000000' for easy to restore, and the ZK path is just deleted as expected, and the externalized checkpoint path 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' actually not be discarded. If you want resume from previous job, you should use -s command to resume from retained checkpoint. [1]



Best
Yun Tang

From: Kathula, Sandeep <[hidden email]>
Sent: Sunday, June 7, 2020 4:27
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Flink not restoring from checkpoint when job manager fails even with HA
 

Hi,

    We are running Flink in K8S. We used https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html    to set high availability. We set max number of retries for a task to 2. After task fails twice and then the job manager fails. This is expected. But it is removing checkpoint from the zookeeper. As a result on the restart it is not consuming from the previous checkpoint. We are losing the data.

 

Logs:

 

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 00000000000000000000000000000000.

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - Shutting down

2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing /flink/sessionization_test4/checkpoints/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint with ID 11 at 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' not discarded.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Shutting down.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Removing /checkpoint-counter/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job 00000000000000000000000000000000 reached globally terminal state FAILED.

2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job sppstandardresourcemanager-flink-0606193838-6d7dae7e(00000000000000000000000000000000).

2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Shutting StandaloneJobClusterEntryPoint down with application status FAILED. Diagnostics null.

2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shutting down rest endpoint.

2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Suspending SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager is shutting down..

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect job manager [hidden email]://flink@flink-job-cluster:6123/user/jobmanager_0 for job 00000000000000000000000000000000 from the resource manager.

2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.

2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.

2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut down complete.

2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut down cluster because application is in FAILED, diagnostics null.

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.

2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator - Shutting down stack trace sample coordinator.

2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.

2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.975 INFO  o.a.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:6124

2020/06/06 19:39:07.975 INFO  o.a.f.r.h.z.ZooKeeperHaServices - Close and clean up all data for ZooKeeperHaServices.

2020/06/06 19:39:08.085 INFO  o.a.f.s.c.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - EventThread shut down for session: 0x17282452e8c0823

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ZooKeeper - Session: 0x17282452e8c0823 closed

2020/06/06 19:39:08.091 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.093 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.096 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.097 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.108 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.114 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.123 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopped Akka RPC service.

2020/06/06 19:39:08.124 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.

 

 

 

 

 

 

 

 

 

Can you please help?

 

Thanks

Sandeep Kathula

Reply | Threaded
Open this post in threaded view
|

Re: Flink not restoring from checkpoint when job manager fails even with HA

Yun Tang
Hi Bhaskar

We strongly not encourage to use such hack configuration to make job always having with the same special job id.
If you stick to use this, all runs of this jobgraph would have the same job id.


Best
Yun Tang

From: Vijay Bhaskar <[hidden email]>
Sent: Monday, June 8, 2020 12:42
To: Yun Tang <[hidden email]>
Cc: Kathula, Sandeep <[hidden email]>; [hidden email] <[hidden email]>; Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Re: Flink not restoring from checkpoint when job manager fails even with HA
 
Hi Yun

If we start using the special Job ID and redeploy the job, then after deployment, will it going to get assigned with special Job ID? or new Job ID?

Regards
Bhaskar

On Mon, Jun 8, 2020 at 9:33 AM Yun Tang <[hidden email]> wrote:
Hi Sandeep

In general, Flink assign unique job-id to each job and use that id as the zk path. Thus when the checkpoint store shuts down with globally terminal state (e.g. FAILED, CANCELLED), it needs to clean paths in ZK to ensure no resource leak as the next job would have different job-id.

I think you just assign special job-id '00000000000000000000000000000000' for easy to restore, and the ZK path is just deleted as expected, and the externalized checkpoint path 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' actually not be discarded. If you want resume from previous job, you should use -s command to resume from retained checkpoint. [1]



Best
Yun Tang

From: Kathula, Sandeep <[hidden email]>
Sent: Sunday, June 7, 2020 4:27
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Flink not restoring from checkpoint when job manager fails even with HA
 

Hi,

    We are running Flink in K8S. We used https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html    to set high availability. We set max number of retries for a task to 2. After task fails twice and then the job manager fails. This is expected. But it is removing checkpoint from the zookeeper. As a result on the restart it is not consuming from the previous checkpoint. We are losing the data.

 

Logs:

 

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 00000000000000000000000000000000.

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - Shutting down

2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing /flink/sessionization_test4/checkpoints/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint with ID 11 at 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' not discarded.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Shutting down.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Removing /checkpoint-counter/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job 00000000000000000000000000000000 reached globally terminal state FAILED.

2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job sppstandardresourcemanager-flink-0606193838-6d7dae7e(00000000000000000000000000000000).

2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Shutting StandaloneJobClusterEntryPoint down with application status FAILED. Diagnostics null.

2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shutting down rest endpoint.

2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Suspending SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager is shutting down..

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect job manager [hidden email]://flink@flink-job-cluster:6123/user/jobmanager_0 for job 00000000000000000000000000000000 from the resource manager.

2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.

2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.

2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut down complete.

2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut down cluster because application is in FAILED, diagnostics null.

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.

2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator - Shutting down stack trace sample coordinator.

2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.

2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.975 INFO  o.a.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:6124

2020/06/06 19:39:07.975 INFO  o.a.f.r.h.z.ZooKeeperHaServices - Close and clean up all data for ZooKeeperHaServices.

2020/06/06 19:39:08.085 INFO  o.a.f.s.c.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - EventThread shut down for session: 0x17282452e8c0823

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ZooKeeper - Session: 0x17282452e8c0823 closed

2020/06/06 19:39:08.091 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.093 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.096 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.097 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.108 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.114 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.123 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopped Akka RPC service.

2020/06/06 19:39:08.124 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.

 

 

 

 

 

 

 

 

 

Can you please help?

 

Thanks

Sandeep Kathula

Reply | Threaded
Open this post in threaded view
|

Re: Flink not restoring from checkpoint when job manager fails even with HA

Vijay Bhaskar
Hi Yun
I'll put my question in other way:

1) First time I deployed my job and got an ID from flink, let's say "abcdef" ( Somehow i remembered ID given to me by flink, by storing in other persistence store)
2)  Next time my job failed. I use my stored Job ID, ("abcdef") to retrieve the retained checkpoint

After my job deployed am i going to get job ID as "abcdef" or new one?  

On Mon, Jun 8, 2020 at 12:06 PM Yun Tang <[hidden email]> wrote:
Hi Bhaskar

We strongly not encourage to use such hack configuration to make job always having with the same special job id.
If you stick to use this, all runs of this jobgraph would have the same job id.


Best
Yun Tang

From: Vijay Bhaskar <[hidden email]>
Sent: Monday, June 8, 2020 12:42
To: Yun Tang <[hidden email]>
Cc: Kathula, Sandeep <[hidden email]>; [hidden email] <[hidden email]>; Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Re: Flink not restoring from checkpoint when job manager fails even with HA
 
Hi Yun

If we start using the special Job ID and redeploy the job, then after deployment, will it going to get assigned with special Job ID? or new Job ID?

Regards
Bhaskar

On Mon, Jun 8, 2020 at 9:33 AM Yun Tang <[hidden email]> wrote:
Hi Sandeep

In general, Flink assign unique job-id to each job and use that id as the zk path. Thus when the checkpoint store shuts down with globally terminal state (e.g. FAILED, CANCELLED), it needs to clean paths in ZK to ensure no resource leak as the next job would have different job-id.

I think you just assign special job-id '00000000000000000000000000000000' for easy to restore, and the ZK path is just deleted as expected, and the externalized checkpoint path 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' actually not be discarded. If you want resume from previous job, you should use -s command to resume from retained checkpoint. [1]



Best
Yun Tang

From: Kathula, Sandeep <[hidden email]>
Sent: Sunday, June 7, 2020 4:27
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Flink not restoring from checkpoint when job manager fails even with HA
 

Hi,

    We are running Flink in K8S. We used https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html    to set high availability. We set max number of retries for a task to 2. After task fails twice and then the job manager fails. This is expected. But it is removing checkpoint from the zookeeper. As a result on the restart it is not consuming from the previous checkpoint. We are losing the data.

 

Logs:

 

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 00000000000000000000000000000000.

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - Shutting down

2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing /flink/sessionization_test4/checkpoints/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint with ID 11 at 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' not discarded.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Shutting down.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Removing /checkpoint-counter/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job 00000000000000000000000000000000 reached globally terminal state FAILED.

2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job sppstandardresourcemanager-flink-0606193838-6d7dae7e(00000000000000000000000000000000).

2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Shutting StandaloneJobClusterEntryPoint down with application status FAILED. Diagnostics null.

2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shutting down rest endpoint.

2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Suspending SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager is shutting down..

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect job manager [hidden email]://flink@flink-job-cluster:6123/user/jobmanager_0 for job 00000000000000000000000000000000 from the resource manager.

2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.

2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.

2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut down complete.

2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut down cluster because application is in FAILED, diagnostics null.

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.

2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator - Shutting down stack trace sample coordinator.

2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.

2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.975 INFO  o.a.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:6124

2020/06/06 19:39:07.975 INFO  o.a.f.r.h.z.ZooKeeperHaServices - Close and clean up all data for ZooKeeperHaServices.

2020/06/06 19:39:08.085 INFO  o.a.f.s.c.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - EventThread shut down for session: 0x17282452e8c0823

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ZooKeeper - Session: 0x17282452e8c0823 closed

2020/06/06 19:39:08.091 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.093 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.096 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.097 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.108 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.114 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.123 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopped Akka RPC service.

2020/06/06 19:39:08.124 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.

 

 

 

 

 

 

 

 

 

Can you please help?

 

Thanks

Sandeep Kathula

Reply | Threaded
Open this post in threaded view
|

Re: Flink not restoring from checkpoint when job manager fails even with HA

Yun Tang
Hi Bhaskar

By default, you will get a new job id.

There existed some hack and hidden method to set the job id but is not meant to be used by the user

Best
Yun Tang

From: Vijay Bhaskar <[hidden email]>
Sent: Monday, June 8, 2020 15:03
To: Yun Tang <[hidden email]>
Cc: Kathula, Sandeep <[hidden email]>; [hidden email] <[hidden email]>; Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Re: Flink not restoring from checkpoint when job manager fails even with HA
 
Hi Yun
I'll put my question in other way:

1) First time I deployed my job and got an ID from flink, let's say "abcdef" ( Somehow i remembered ID given to me by flink, by storing in other persistence store)
2)  Next time my job failed. I use my stored Job ID, ("abcdef") to retrieve the retained checkpoint

After my job deployed am i going to get job ID as "abcdef" or new one?  

On Mon, Jun 8, 2020 at 12:06 PM Yun Tang <[hidden email]> wrote:
Hi Bhaskar

We strongly not encourage to use such hack configuration to make job always having with the same special job id.
If you stick to use this, all runs of this jobgraph would have the same job id.


Best
Yun Tang

From: Vijay Bhaskar <[hidden email]>
Sent: Monday, June 8, 2020 12:42
To: Yun Tang <[hidden email]>
Cc: Kathula, Sandeep <[hidden email]>; [hidden email] <[hidden email]>; Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Re: Flink not restoring from checkpoint when job manager fails even with HA
 
Hi Yun

If we start using the special Job ID and redeploy the job, then after deployment, will it going to get assigned with special Job ID? or new Job ID?

Regards
Bhaskar

On Mon, Jun 8, 2020 at 9:33 AM Yun Tang <[hidden email]> wrote:
Hi Sandeep

In general, Flink assign unique job-id to each job and use that id as the zk path. Thus when the checkpoint store shuts down with globally terminal state (e.g. FAILED, CANCELLED), it needs to clean paths in ZK to ensure no resource leak as the next job would have different job-id.

I think you just assign special job-id '00000000000000000000000000000000' for easy to restore, and the ZK path is just deleted as expected, and the externalized checkpoint path 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' actually not be discarded. If you want resume from previous job, you should use -s command to resume from retained checkpoint. [1]



Best
Yun Tang

From: Kathula, Sandeep <[hidden email]>
Sent: Sunday, June 7, 2020 4:27
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Deshpande, Omkar <[hidden email]>
Subject: Flink not restoring from checkpoint when job manager fails even with HA
 

Hi,

    We are running Flink in K8S. We used https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html    to set high availability. We set max number of retries for a task to 2. After task fails twice and then the job manager fails. This is expected. But it is removing checkpoint from the zookeeper. As a result on the restart it is not consuming from the previous checkpoint. We are losing the data.

 

Logs:

 

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 00000000000000000000000000000000.

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - Shutting down

2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing /flink/sessionization_test4/checkpoints/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint with ID 11 at 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11' not discarded.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Shutting down.

2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Removing /checkpoint-counter/00000000000000000000000000000000 from ZooKeeper

2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job 00000000000000000000000000000000 reached globally terminal state FAILED.

2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job sppstandardresourcemanager-flink-0606193838-6d7dae7e(00000000000000000000000000000000).

2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Shutting StandaloneJobClusterEntryPoint down with application status FAILED. Diagnostics null.

2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shutting down rest endpoint.

2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Suspending SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager is shutting down..

2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping SlotPool.

2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect job manager [hidden email]://flink@flink-job-cluster:6123/user/jobmanager_0 for job 00000000000000000000000000000000 from the resource manager.

2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.

2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.

2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut down complete.

2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut down cluster because application is in FAILED, diagnostics null.

2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the SlotManager.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.

2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.

2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator - Shutting down stack trace sample coordinator.

2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.

2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.

2020/06/06 19:39:07.975 INFO  o.a.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:6124

2020/06/06 19:39:07.975 INFO  o.a.f.r.h.z.ZooKeeperHaServices - Close and clean up all data for ZooKeeperHaServices.

2020/06/06 19:39:08.085 INFO  o.a.f.s.c.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - EventThread shut down for session: 0x17282452e8c0823

2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ZooKeeper - Session: 0x17282452e8c0823 closed

2020/06/06 19:39:08.091 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.093 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

2020/06/06 19:39:08.096 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.097 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.

2020/06/06 19:39:08.099 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

2020/06/06 19:39:08.108 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.114 INFO  a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.

2020/06/06 19:39:08.123 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopped Akka RPC service.

2020/06/06 19:39:08.124 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.

 

 

 

 

 

 

 

 

 

Can you please help?

 

Thanks

Sandeep Kathula