I’m running Flink 1.5.0 in Kubernetes with HA enabled, but only a single Job Manager running. I’m using Zookeeper to store the fencing/leader information and S3 to store the job manager state. We’ve been running around 250 or so streaming jobs and we’ve noticed that if the job manager pod is deleted, it takes something like 20-45 minutes for the job manager’s REST endpoints and web UI to become available. Until it becomes available, we get a 503 response from the HTTP server with the message "Could not retrieve the redirect address of the current leader. Please try to refresh.”.
Has anyone else run into this? Are there any configuration settings I should be looking at to speed up the availability of the HTTP endpoints? Thanks! -Joey |
Sorry to ping my own thread, but has anyone else encountered this?
-Joey > On Jul 30, 2018, at 11:10 AM, Joey Echeverria <[hidden email]> wrote: > > I’m running Flink 1.5.0 in Kubernetes with HA enabled, but only a single Job Manager running. I’m using Zookeeper to store the fencing/leader information and S3 to store the job manager state. We’ve been running around 250 or so streaming jobs and we’ve noticed that if the job manager pod is deleted, it takes something like 20-45 minutes for the job manager’s REST endpoints and web UI to become available. Until it becomes available, we get a 503 response from the HTTP server with the message "Could not retrieve the redirect address of the current leader. Please try to refresh.”. > > Has anyone else run into this? > > Are there any configuration settings I should be looking at to speed up the availability of the HTTP endpoints? > > Thanks! > > -Joey |
Hi Joey, Currently rest endpoints are hosted in JM. Your scenario is at JM failover, and your cluster is running so many jobs. Here, it takes a certain amount of time for ZK to conduct the Leader election. Then JM needs to wait for the TM registration. So many jobs need to be restored and start running. It is likely to go through a long period of time, so within this period. JM can be quite busy and can cause web services to be unresponsive or slow to respond. But 20-45 minutes is really long, so you first need to confirm what caused it. For example, if you reduce the cluster's job data by half, can the web response time be much faster? Thanks, vino. 2018-08-02 3:39 GMT+08:00 Joey Echeverria <[hidden email]>: Sorry to ping my own thread, but has anyone else encountered this? |
In reply to this post by Joey Echeverria
Hi Joey,
If the other components (e.g., Dispatcher, ResourceManager) are able to finish the leader election in a timely manner, I currently do not see a reason why it should take the REST server 20 - 45 minutes. You can check the contents of znode /flink/.../leader/rest_server_lock to see if there is indeed no leader, or if the leader information cannot be retrieved from ZooKeeper. If you can reproduce this in a staging environment with some test jobs, I'd like to see the ClusterEntrypoint/JobManager logs (perhaps on debug level). Best, Gary On Mon, Jul 30, 2018 at 8:10 PM, Joey Echeverria <[hidden email]> wrote: I’m running Flink 1.5.0 in Kubernetes with HA enabled, but only a single Job Manager running. I’m using Zookeeper to store the fencing/leader information and S3 to store the job manager state. We’ve been running around 250 or so streaming jobs and we’ve noticed that if the job manager pod is deleted, it takes something like 20-45 minutes for the job manager’s REST endpoints and web UI to become available. Until it becomes available, we get a 503 response from the HTTP server with the message "Could not retrieve the redirect address of the current leader. Please try to refresh.”. |
Thanks or the tips Gary and Vino. I’ll try to reproduce it with test data and see if I can post some logs.
I’ll also watch the leader znode to see if the election isn’t happening or if it’s not being retrieved.
Thanks!
-Joey
|
I don’t have logs available yet, but I do have some information from ZK.
The culprit appears to be the /flink/default/leader/dispatcher_lock znode.
I took a look at the dispatcher code here: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L762-L785
And it looks to me that when leadership is granted it will perform job recovery on all jobs before it writes the new leader information to the /flink/default/leader/dispatcher_lock znode.
So this leaves me with three questions:
1) Why does the web monitor specifically have to wait for the dispatcher?
2) Is there a reason why the dispatcher can’t write the lock until after job recovery?
3) Is there anything I can/should be doing to speed up job recovery?
Thanks!
-Joey
|
Hi Joey, Good question! I will copy it to Till and Chesnay who know this part of the implementation. Thanks, vino. 2018-08-03 11:09 GMT+08:00 Joey Echeverria <[hidden email]>:
|
Hi Joey, your analysis is correct. Currently, the Dispatcher will first try to recover all jobs before it confirms the leadership. 1) The Dispatcher provides much of the relevant information you see in the web-ui. Without a leading Dispatcher, the web-ui cannot show much information. But this could also be changed such that in the situation where no Dispatcher is a leader, we cannot display certain information (number of running jobs, job details, etc.). Could you create a JIRA issue to fix this problem? 2) The reason why the Dispatcher first tries to recover the jobs before confirming the leadership is because it first tries to restore its internal state before it is accessible by other components and, thus, state changes. For example, the following problem could arise: Assume that you submit a job to the cluster. The cluster receives the JobGraph and persists it in ZooKeeper. Before the Dispatcher can acknowledge the job submission it fails. The client sees the failure and tries to re-submit the job. Now the Dispatcher is restarted and starts recovering the persisted jobs. If we don't wait for this to complete, then the retried job submission could succeed first because it is just faster. This would, however, let the job recovery fail because the Dispatcher is already executing this job (due to the re-submission) and the assumption is that recovered jobs are submitted first. The same applies if you should submit a modified job with the same JobID as a persisted job. Which job should the system then execute? The old one or the newly submitted job. By waiting to first complete the recovery, we give precedence to the persisted jobs. One could solve this problem also slightly differently, by only blocking the job submission while a recovery is happening. However, one should check that no other RPCs change the internal state in such a way that it interferes with the job recovery. Could you maybe open a JIRA issue for solving this problem? 3) The job recovery is mainly limited by the connection to your persistent storage system (HDFS or S3 I assume) where the JobGraphs are stored. Alternatively, you could split the number of executed jobs across multiple Flink clusters in order to decrease the number of jobs which need to be recovered in case of a failure. Thanks a lot for reporting and analysing this problem. This is definitely something we should improve! Cheers, Till On Fri, Aug 3, 2018 at 5:48 AM vino yang <[hidden email]> wrote:
|
Hi Joey, Did you create these JIRA issues based on Till's suggestion? If you didn't create them or you don't know how to do it, I can do it for you. But I won't do it right away, I will wait for a while. Thanks, vino. 2018-08-03 17:23 GMT+08:00 Till Rohrmann <[hidden email]>:
|
Thanks for the ping Vino.
I created two JIRAs for the first two items:
Regarding (3) we’re doing some testing with different options for the state storage. I’ll report back if we find anything significant there.
-Joey
|
Hi Joey, Thank you for finding these issues and creating them. Thanks, vino. 2018-08-07 8:18 GMT+08:00 Joey Echeverria <[hidden email]>:
|
I wanted to follow up on this thread one last time as we found a solution for the recovery time that worked well for us.
Originally, we were running job by using a jar that shaded in all of our dependencies. We switched to a more lightweight jar for the job itself and made the dependency jar an extra element added to the class path. That sped up recovery significantly
to around ~1 minute for 250 jobs.
In case anyone else hits this again, this is something they can try.
-Joey
|
Free forum by Nabble | Edit this page |