HA lock nodes, Checkpoints, and JobGraphs not being removed after failure

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

HA lock nodes, Checkpoints, and JobGraphs not being removed after failure

dyana.rose@salecycle.com
originally posted to the dev group, but it's a bit easy for things to get buried a bit there, and this may concern other HA users.

Flink v1.7.1

After a Flink reboot we've been seeing some unexpected issues with excess retained checkpoints not being able to be removed from ZooKeeper after a new checkpoint is created.

I believe I've got my head around the role of ZK and lockNodes in Checkpointing after going through the code. Could you check my logic on this and add any insight, especially if I've got it wrong?

The situation:
1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA with S3 as the backing store.

2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has its own lockNode UUID. JM1 is elected leader.

3) We submit a job, that JobGraph lockNode is added to ZK using JM1's JobGraph lockNode.

4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's checkpoint lockNode. We continue running, and checkpoints are successfully being created and excess checkpoints removed.

5) Both JM1 and JM2 now are rebooted.

6) The JobGraph is recovered by the leader, the job restarts from the latest checkpoint.

Now after every new checkpoint we see in the ZooKeeper logs:
INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10000047715000d type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813 Error:KeeperErrorCode = Directory not empty for /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781
with an increasing checkpoint id on each subsequent call.

When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, right? As the old checkpoints were created under the old UUID, the new JMs will never be able to remove the old retained checkpoints from ZooKeeper.

Is that correct?

If so, would this also happen with JobGraphs in the following situation (we saw this just recently where we had a JobGraph for a cancelled job still in ZK):

Steps 1 through 3 above, then:
4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 restarts.

5) some time later while JM2 is still leader we hard cancel the job and restart the JMs

In this case JM2 would successfully remove the job from s3, but because its lockNode is different from JM1 it cannot delete the lock file in the jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and tries to process the JobGraph it has found, but the S3 files have been deleted.

Possible related closed issues (fixes went in v1.7.0): https://issues.apache.org/jira/browse/FLINK-10184 and https://issues.apache.org/jira/browse/FLINK-10255

Thanks for any insight,
Dyana
Reply | Threaded
Open this post in threaded view
|

Re: HA lock nodes, Checkpoints, and JobGraphs not being removed after failure

Till Rohrmann
Cross linking the dev ML thread [1]. Let us continue the discussion there.


Cheers,
Till

On Tue, Apr 23, 2019 at 9:52 AM dyana.rose <[hidden email]> wrote:
originally posted to the dev group, but it's a bit easy for things to get buried a bit there, and this may concern other HA users.

Flink v1.7.1

After a Flink reboot we've been seeing some unexpected issues with excess retained checkpoints not being able to be removed from ZooKeeper after a new checkpoint is created.

I believe I've got my head around the role of ZK and lockNodes in Checkpointing after going through the code. Could you check my logic on this and add any insight, especially if I've got it wrong?

The situation:
1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA with S3 as the backing store.

2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has its own lockNode UUID. JM1 is elected leader.

3) We submit a job, that JobGraph lockNode is added to ZK using JM1's JobGraph lockNode.

4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's checkpoint lockNode. We continue running, and checkpoints are successfully being created and excess checkpoints removed.

5) Both JM1 and JM2 now are rebooted.

6) The JobGraph is recovered by the leader, the job restarts from the latest checkpoint.

Now after every new checkpoint we see in the ZooKeeper logs:
INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10000047715000d type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813 Error:KeeperErrorCode = Directory not empty for /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781
with an increasing checkpoint id on each subsequent call.

When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, right? As the old checkpoints were created under the old UUID, the new JMs will never be able to remove the old retained checkpoints from ZooKeeper.

Is that correct?

If so, would this also happen with JobGraphs in the following situation (we saw this just recently where we had a JobGraph for a cancelled job still in ZK):

Steps 1 through 3 above, then:
4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 restarts.

5) some time later while JM2 is still leader we hard cancel the job and restart the JMs

In this case JM2 would successfully remove the job from s3, but because its lockNode is different from JM1 it cannot delete the lock file in the jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and tries to process the JobGraph it has found, but the S3 files have been deleted.

Possible related closed issues (fixes went in v1.7.0): https://issues.apache.org/jira/browse/FLINK-10184 and https://issues.apache.org/jira/browse/FLINK-10255

Thanks for any insight,
Dyana