Flink zookeeper HA problem

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink zookeeper HA problem

mark.harris
Hi,

We've got a problem trying to set up two flink clusters using the same zookeeper instance that we wonder if anyone has seen before or has any advice on.

Our setup is two AWS EMR clusters running flink (v1.7.2) that are both trying to use a single zookeeper cluster (v3.4.6-1569965) for their HA configuration. As a cost saving measure, we have the clusters configured to terminate at 19:00 and restart at 08:00 each day. 

Cluster 1 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 6 -tm 9472 -jm 2048 -s 8

high-availability: zookeeper
high-availability.cluster-id: /cluster1
high-availability.storageDir: s3://xxxx/flink/cluster1/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Cluster 2 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 2 -tm 6144 -jm 6144 -s 1

high-availability: zookeeper
high-availability.cluster-id: /cluster2
high-availability.storageDir: s3://xxxx/flink/cluster2/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Since upgrading to flink 1.7.2 from 1.6.1, we've found that whichever cluster happens to start second fails to start with the following error:

2019-03-07 10:59:05,142 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 10:59:05,166 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 10:59:05,171 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,184 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,187 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2019-03-07 11:00:05,194 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2019-03-07 11:00:05,194 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 11:00:05,195 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,195 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - backgroundOperationsLoop exiting
2019-03-07 11:00:05,200 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 0x6b691520c6774910 closed
2019-03-07 11:00:05,200 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - EventThread shut down for session: 0x6b691520c6774910
2019-03-07 11:00:05,408 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Killed application application_1551946032263_0005
2019-03-07 11:00:05,488 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Error while running the Flink Yarn session.
org.apache.flink.util.FlinkException: Could not write the Yarn connection information.
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:636)
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:810)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:810)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader address and leader session ID.
	at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:134)
	at org.apache.flink.client.program.rest.RestClusterClient.getClusterConnectionInfo(RestClusterClient.java:513)
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:613)
	... 6 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:255)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:259)
	at scala.concurrent.Await$.$anonfun$result$1(package.scala:215)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.result(package.scala:142)
	at scala.concurrent.Await.result(package.scala)
	at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:132)
	... 8 more

Further attempts to start the cluster fail in the same way, and the only solution seems to be to clear out the HA information in zookeeper (and the filesystem). After doing this, the cluster starts successfully without running any jobs.
Cluster 2 usually starts up first and succeeds, but we've seen it both ways around. We've also tried swapping the values of cluster-id and path.root for each cluster, which has the same problem.


The problem appears to be that it can't find the leader/dispatcher. Turning the logging up to DEBUG, I can see some suggestion in the jobmanager log that this election has completed successfully:
2019/03/07 10:59:02,781 DEBUG org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Grant leadership to contender akka.tcp://[hidden email]:44315/user/dispatcher with session ID fd044f52-b05c-4feb-9ccf-4c8f18ddf18c.
2019/03/07 10:59:08,035 DEBUG org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://[hidden email]:44315/user/dispatcher accepted leadership with fencing token 9ccf4c8f18ddf18cfd044f52b05c4feb. Start recovered jobs.

There don't seem to be any errors in the jobmanager log.

Any advice on how to start these two clusters or suggestions for other avenues to solve or debug would be really gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are not authorised to and must not disclose, copy, distribute, or retain this message or any part of it. It may contain information which is confidential and/or covered by legal professional or other privilege under applicable law.

The views expressed in this email are not necessarily the views of Centrica plc or its subsidiaries, and the company, its directors, officers or employees make no representation or accept any liability for its accuracy or completeness unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas Social Housing Limited (company no: 01026007), British Gas Trading Limited (company no: 03078711), British Gas Services Limited (company no: 3141243), British Gas Insurance Limited (company no: 06608316), British Gas New Heating Limited (company no: 06723244), British Gas Services (Commercial) Limited (company no: 07385984) and Centrica Energy (Trading) Limited (company no: 02877397) are all wholly owned subsidiaries of Centrica plc (company no: 3033654). Each company is registered in England and Wales with a registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation Authority and regulated by the Financial Conduct Authority and the Prudential Regulation Authority. British Gas Services Limited and Centrica Energy (Trading) Limited are authorised and regulated by the Financial Conduct Authority. British Gas Trading Limited is an appointed representative of British Gas Services Limited which is authorised and regulated by the Financial Conduct Authority.
Reply | Threaded
Open this post in threaded view
|

Re: Flink zookeeper HA problem

mark.harris
Sometimes it's the simplest things - the 40 or so jobs we have seem to take longer to reload on cluster start up than in flink 1.6, and it was timing out. Increasing the value for the timeout over 5 minutes and everything works again.

From: Harris, Mark <[hidden email]>
Sent: 07 March 2019 12:33
To: user
Subject: Flink zookeeper HA problem
 
Hi,

We've got a problem trying to set up two flink clusters using the same zookeeper instance that we wonder if anyone has seen before or has any advice on.

Our setup is two AWS EMR clusters running flink (v1.7.2) that are both trying to use a single zookeeper cluster (v3.4.6-1569965) for their HA configuration. As a cost saving measure, we have the clusters configured to terminate at 19:00 and restart at 08:00 each day. 

Cluster 1 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 6 -tm 9472 -jm 2048 -s 8

high-availability: zookeeper
high-availability.cluster-id: /cluster1
high-availability.storageDir: s3://xxxx/flink/cluster1/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Cluster 2 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 2 -tm 6144 -jm 6144 -s 1

high-availability: zookeeper
high-availability.cluster-id: /cluster2
high-availability.storageDir: s3://xxxx/flink/cluster2/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Since upgrading to flink 1.7.2 from 1.6.1, we've found that whichever cluster happens to start second fails to start with the following error:

2019-03-07 10:59:05,142 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 10:59:05,166 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 10:59:05,171 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,184 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,187 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2019-03-07 11:00:05,194 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2019-03-07 11:00:05,194 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 11:00:05,195 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,195 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - backgroundOperationsLoop exiting
2019-03-07 11:00:05,200 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 0x6b691520c6774910 closed
2019-03-07 11:00:05,200 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - EventThread shut down for session: 0x6b691520c6774910
2019-03-07 11:00:05,408 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Killed application application_1551946032263_0005
2019-03-07 11:00:05,488 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Error while running the Flink Yarn session.
org.apache.flink.util.FlinkException: Could not write the Yarn connection information.
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:636)
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:810)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:810)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader address and leader session ID.
	at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:134)
	at org.apache.flink.client.program.rest.RestClusterClient.getClusterConnectionInfo(RestClusterClient.java:513)
	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:613)
	... 6 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:255)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:259)
	at scala.concurrent.Await$.$anonfun$result$1(package.scala:215)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.result(package.scala:142)
	at scala.concurrent.Await.result(package.scala)
	at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:132)
	... 8 more

Further attempts to start the cluster fail in the same way, and the only solution seems to be to clear out the HA information in zookeeper (and the filesystem). After doing this, the cluster starts successfully without running any jobs.
Cluster 2 usually starts up first and succeeds, but we've seen it both ways around. We've also tried swapping the values of cluster-id and path.root for each cluster, which has the same problem.


The problem appears to be that it can't find the leader/dispatcher. Turning the logging up to DEBUG, I can see some suggestion in the jobmanager log that this election has completed successfully:
2019/03/07 10:59:02,781 DEBUG org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Grant leadership to contender akka.tcp://[hidden email]:44315/user/dispatcher with session ID fd044f52-b05c-4feb-9ccf-4c8f18ddf18c.
2019/03/07 10:59:08,035 DEBUG org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://[hidden email]:44315/user/dispatcher accepted leadership with fencing token 9ccf4c8f18ddf18cfd044f52b05c4feb. Start recovered jobs.

There don't seem to be any errors in the jobmanager log.

Any advice on how to start these two clusters or suggestions for other avenues to solve or debug would be really gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are not authorised to and must not disclose, copy, distribute, or retain this message or any part of it. It may contain information which is confidential and/or covered by legal professional or other privilege under applicable law.

The views expressed in this email are not necessarily the views of Centrica plc or its subsidiaries, and the company, its directors, officers or employees make no representation or accept any liability for its accuracy or completeness unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas Social Housing Limited (company no: 01026007), British Gas Trading Limited (company no: 03078711), British Gas Services Limited (company no: 3141243), British Gas Insurance Limited (company no: 06608316), British Gas New Heating Limited (company no: 06723244), British Gas Services (Commercial) Limited (company no: 07385984) and Centrica Energy (Trading) Limited (company no: 02877397) are all wholly owned subsidiaries of Centrica plc (company no: 3033654). Each company is registered in England and Wales with a registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation Authority and regulated by the Financial Conduct Authority and the Prudential Regulation Authority. British Gas Services Limited and Centrica Energy (Trading) Limited are authorised and regulated by the Financial Conduct Authority. British Gas Trading Limited is an appointed representative of British Gas Services Limited which is authorised and regulated by the Financial Conduct Authority.


The information contained in or attached to this email is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are not authorised to and must not disclose, copy, distribute, or retain this message or any part of it. It may contain information which is confidential and/or covered by legal professional or other privilege under applicable law.

The views expressed in this email are not necessarily the views of Centrica plc or its subsidiaries, and the company, its directors, officers or employees make no representation or accept any liability for its accuracy or completeness unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas Social Housing Limited (company no: 01026007), British Gas Trading Limited (company no: 03078711), British Gas Services Limited (company no: 3141243), British Gas Insurance Limited (company no: 06608316), British Gas New Heating Limited (company no: 06723244), British Gas Services (Commercial) Limited (company no: 07385984) and Centrica Energy (Trading) Limited (company no: 02877397) are all wholly owned subsidiaries of Centrica plc (company no: 3033654). Each company is registered in England and Wales with a registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation Authority and regulated by the Financial Conduct Authority and the Prudential Regulation Authority. British Gas Services Limited and Centrica Energy (Trading) Limited are authorised and regulated by the Financial Conduct Authority. British Gas Trading Limited is an appointed representative of British Gas Services Limited which is authorised and regulated by the Financial Conduct Authority.