Old job resurrected during HA failover

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Old job resurrected during HA failover

Elias Levy
For the second time in as many months we've had an old job resurrected during HA failover in a 1.4.2 standalone cluster.  Failover was initiated when the leading JM lost its connection to ZK.  I opened FLINK-10011 with the details.

We are using S3 with the Presto adapter as our distributed store.  After we cleaned up the cluster by shutting down the two jobs started after failover and starting a new job from the last known good checkpoint from the single job running in the cluster before failover, the HA recovery directory looks as follows:

3cmd ls s3://bucket/flink/cluster_1/recovery/
 DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5
2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb
2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02
2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec
2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c

submittedJobGraph7f627a661cec appears to be job 2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during the ZK failover

submittedJobGraphf3767780c00c appears to be job d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a checkpoint after shutting down the duplicate jobs

Should submittedJobGraph7f627a661cec exist in the recovery directory if 2a4eff355aef849c5ca37dbac04f2ff1 is no longer running?


Reply | Threaded
Open this post in threaded view
|

Re: Old job resurrected during HA failover

vino yang
Hi Elias,

If a job is explicitly canceled, its jobgraph node on ZK will be deleted. 
However, it is worth noting here that Flink enables a background thread to asynchronously delete the jobGraph node, 
so there may be cases where it cannot be deleted. 
On the other hand, the jobgraph node on ZK is the only basis for the JM leader to restore the job. 
There may be an unexpected recovery or an old job resurrection.

Thanks, vino.

2018-08-01 23:13 GMT+08:00 Elias Levy <[hidden email]>:
For the second time in as many months we've had an old job resurrected during HA failover in a 1.4.2 standalone cluster.  Failover was initiated when the leading JM lost its connection to ZK.  I opened FLINK-10011 with the details.

We are using S3 with the Presto adapter as our distributed store.  After we cleaned up the cluster by shutting down the two jobs started after failover and starting a new job from the last known good checkpoint from the single job running in the cluster before failover, the HA recovery directory looks as follows:

3cmd ls s3://bucket/flink/cluster_1/recovery/
 DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5
2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb
2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02
2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec
2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c

submittedJobGraph7f627a661cec appears to be job 2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during the ZK failover

submittedJobGraphf3767780c00c appears to be job d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a checkpoint after shutting down the duplicate jobs

Should submittedJobGraph7f627a661cec exist in the recovery directory if 2a4eff355aef849c5ca37dbac04f2ff1 is no longer running?



Reply | Threaded
Open this post in threaded view
|

Re: Old job resurrected during HA failover

Elias Levy
Vino,

Thanks for the reply.  Looking in ZK I see:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even though that job is no longer running (it was canceled while it was in a loop attempting to restart, but failing because of a lack of cluster slots).

Any idea why that may be the case?


On Wed, Aug 1, 2018 at 8:38 AM vino yang <[hidden email]> wrote:
If a job is explicitly canceled, its jobgraph node on ZK will be deleted. 
However, it is worth noting here that Flink enables a background thread to asynchronously delete the jobGraph node, 
so there may be cases where it cannot be deleted. 
On the other hand, the jobgraph node on ZK is the only basis for the JM leader to restore the job. 
There may be an unexpected recovery or an old job resurrection.
Reply | Threaded
Open this post in threaded view
|

Re: Old job resurrected during HA failover

Elias Levy
I can see in the logs that the JM 1 (10.210.22.167), that one that became leader after failover, thinks it deleted the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:

July 30th 2018, 15:32:27.231 Trying to cancel job with ID 2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232 Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to CANCELED.
July 30th 2018, 15:32:27.232 Stopping checkpoint coordinator for job 2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239 Removed job graph 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245 Removing /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
July 30th 2018, 15:32:27.251 Removing /checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper

Both /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 and /flink/cluster_1/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 no longer exist, but for some reason the job graph as is still there.

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241 Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:delete cxid:0x434c zxid:0x60009dd94 txntype:-1 reqpath:n/a Error Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = Directory not empty for /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

From the comments in ZooKeeperStateHandleStore.java I gather that this child node is used as a deletion lock.  Looking at the contents of this ephemeral lock node:

[zk: localhost:2181(CONNECTED) 16] get /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
10.210.42.62
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x30000003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get /flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
10.210.22.167
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2000001d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked by the previous JM leader, JM 2(10.210.42.62), while the running job locked by the current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership failed over to JM 2.  

Shouldn't something call ZooKeeperStateHandleStore.releaseAll during HA failover to release the locks on the graphs?


On Wed, Aug 1, 2018 at 9:49 AM Elias Levy <[hidden email]> wrote:
Thanks for the reply.  Looking in ZK I see:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even though that job is no longer running (it was canceled while it was in a loop attempting to restart, but failing because of a lack of cluster slots).

Any idea why that may be the case?
Reply | Threaded
Open this post in threaded view
|

Re: Old job resurrected during HA failover

vino yang
Hi Elias,

Your analysis is correct, yes, in theory the old jobgraph should be deleted, but Flink currently uses the method of locking and asynchronously deleting Path, so that it can not give you the acknowledgment of deleting, so this is a risk point.

cc Till, there have been users who have encountered this problem before. I personally think that asynchronous deletion may be risky, which may cause JM to be revived by the cancel job after the failover.

Thanks, vino.

2018-08-02 5:25 GMT+08:00 Elias Levy <[hidden email]>:
I can see in the logs that the JM 1 (10.210.22.167), that one that became leader after failover, thinks it deleted the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:

July 30th 2018, 15:32:27.231 Trying to cancel job with ID 2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232 Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to CANCELED.
July 30th 2018, 15:32:27.232 Stopping checkpoint coordinator for job 2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239 Removed job graph 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245 Removing /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
July 30th 2018, 15:32:27.251 Removing /checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper

Both /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 and /flink/cluster_1/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 no longer exist, but for some reason the job graph as is still there.

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241 Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:delete cxid:0x434c zxid:0x60009dd94 txntype:-1 reqpath:n/a Error Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = Directory not empty for /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

From the comments in ZooKeeperStateHandleStore.java I gather that this child node is used as a deletion lock.  Looking at the contents of this ephemeral lock node:

[zk: localhost:2181(CONNECTED) 16] get /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
10.210.42.62
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x30000003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get /flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
10.210.22.167
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2000001d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked by the previous JM leader, JM 2(10.210.42.62), while the running job locked by the current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership failed over to JM 2.  

Shouldn't something call ZooKeeperStateHandleStore.releaseAll during HA failover to release the locks on the graphs?


On Wed, Aug 1, 2018 at 9:49 AM Elias Levy <[hidden email]> wrote:
Thanks for the reply.  Looking in ZK I see:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even though that job is no longer running (it was canceled while it was in a loop attempting to restart, but failing because of a lack of cluster slots).

Any idea why that may be the case?

Reply | Threaded
Open this post in threaded view
|

Re: Old job resurrected during HA failover

Elias Levy
Till,

Thoughts?

On Wed, Aug 1, 2018 at 7:34 PM vino yang <[hidden email]> wrote:
Your analysis is correct, yes, in theory the old jobgraph should be deleted, but Flink currently uses the method of locking and asynchronously deleting Path, so that it can not give you the acknowledgment of deleting, so this is a risk point.

cc Till, there have been users who have encountered this problem before. I personally think that asynchronous deletion may be risky, which may cause JM to be revived by the cancel job after the failover.
Reply | Threaded
Open this post in threaded view
|

Re: Old job resurrected during HA failover

Till Rohrmann
Hi Elias and Vino,

sorry for the late reply. 

I think your analysis is pretty much to the point. The current implementation does not properly respect the situation with multiple standby JobManagers. In the single JobManager case, a loss of leadership either means that the JobManager has died and, thus, also its ZooKeeper connection which causes the ephemeral nodes to disappear or the same JobManager will be reelected. In the case of multiple standby JobManagers a lost leadership caused by a ZooKeeper hickup could cause that a different JM will become the leader while the old leader still keeps its connection to ZooKeeper (e.g. after reestablishing it). In this case, the ephemeral nodes won't be deleted automatically. Consequently, it is necessary to explicitly free all locked resources as Elias has proposed. This problem affects the legacy as well as the new mode. This is a critical issue which we should fix asap.

Thanks for reporting this issue and the in-depth analysis of the cause Elias!

A somewhat related problem is that the actual ZooKeeper delete operation is executed in a background thread without proper failure handling. As far as I can tell, we only log on DEBUG that the node could not be deleted. I think this should be fixed as well because then the problem would be easier to identify.

Cheers,
Till

On Fri, Aug 3, 2018 at 5:42 PM Elias Levy <[hidden email]> wrote:
Till,

Thoughts?

On Wed, Aug 1, 2018 at 7:34 PM vino yang <[hidden email]> wrote:
Your analysis is correct, yes, in theory the old jobgraph should be deleted, but Flink currently uses the method of locking and asynchronously deleting Path, so that it can not give you the acknowledgment of deleting, so this is a risk point.

cc Till, there have been users who have encountered this problem before. I personally think that asynchronous deletion may be risky, which may cause JM to be revived by the cancel job after the failover.