Hi devs and users,
I would like to start the discussion about FLIP-144[1], which will introduce a new native high availability service for Kubernetes. Currently, Flink has provided Zookeeper HA service and been widely used in production environments. It could be integrated in standalone cluster, Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s will take additional cost since we need to manage a Zookeeper cluster. In the meantime, K8s has provided some public API for leader election[2] and configuration storage(i.e. ConfigMap[3]). We could leverage these features and make running HA configured Flink cluster on K8s more convenient. Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService. [1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink [3]. https://kubernetes.io/docs/concepts/configuration/configmap/ Looking forward to your feedback. Best, Yang |
Thanks for preparing this FLIP, @Yang. In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin ConfigMap for Flink's HA services should significantly reduce the maintenance overhead compared to deploying a ZK cluster. I think this is an attractive feature for users. Concerning the proposed design, I have some questions. Might not be problems, just trying to understand. ## Architecture Why does the leader election need two ConfigMaps (`lock for contending leader`, and `leader RPC address`)? What happens if the two ConfigMaps are not updated consistently? E.g., a TM learns about a new JM becoming leader (lock for contending leader updated), but still gets the old leader's address when trying to read `leader RPC address`? ## HA storage > Lock and release It seems to me that the owner needs to explicitly release the lock so that other peers can write/remove the stored object. What if the previous owner failed to release the lock (e.g., dead before releasing)? Would there be any problem? ## HA storage > HA data clean up If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how are the HA dada retained? Thank you~ Xintong Song On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]> wrote:
|
This is a very cool feature proposal. One lesson-learned from the ZooKeeper-based HA is that it is overly complicated to have the Leader RPC address in a different node than the LeaderLock. There is extra code needed to make sure these converge and the can be temporarily out of sync. A much easier design would be to have the RPC address as payload in the lock entry (ZNode in ZK), the same way that the leader fencing token is stored as payload of the lock. I think for the design above it would mean having a single ConfigMap for both leader lock and leader RPC address discovery. This probably serves as a good design principle in general - not divide information that is updated together over different resources. Best, Stephan On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]> wrote:
|
Hi Xintong and Stephan, Thanks a lot for your attention on this FLIP. I will address the comments inline. # Architecture -> One or two ConfigMaps Both of you are right. One ConfigMap will make the design and implementation easier. Actually, in my POC codes, I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest server component) for the leader election and storage. Once a JobManager win the election, it will update the ConfigMap with leader address and periodically renew the lock annotation to keep as the active leader. I will update the FLIP document, including the architecture diagram, to avoid the misunderstanding. # HA storage > Lock and release This is a valid concern. Since for Zookeeper ephemeral nodes, it will be deleted by the ZK server automatically when the client is timeout. It could happen in a bad network environment or the ZK client crashed exceptionally. For Kubernetes, we need to implement a similar mechanism. First, when we want to lock a specific key in ConfigMap, we will put the owner identify, lease duration, renew time in the ConfigMap annotation. The annotation will be cleaned up when releasing the lock. When we want to remove a job graph or checkpoints, it should satisfy the following conditions. If not, the delete operation could not be done. * Current instance is the owner of the key. * The owner annotation is empty, which means the owner has released the lock. * The owner annotation timed out, which usually indicate the owner died. # HA storage > HA data clean up Sorry for that I do not describe how the HA related ConfigMap is retained clearly. Benefit from the Kubernetes OwnerReference[1], we set owner of the flink-conf configmap, service and TaskManager pods to JobManager Deployment. So when we want to destroy a Flink cluster, we just need to delete the deployment[2]. For the HA related ConfigMaps, we do not set the owner so that they could be retained even though we delete the whole Flink cluster. Best, Yang Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
|
Thanks for creating this FLIP Yang Wang. I believe that many of our users will like a ZooKeeper-less HA setup. +1 for not separating the leader information and the leader election if possible. Maybe it is even possible that the contender writes his leader information directly when trying to obtain the leadership by performing a versioned write operation. Concerning the lock and release operation I have a question: Can there be multiple owners for a given key-value pair in a ConfigMap? If not, how can we ensure that the node which writes his ownership is actually the leader w/o transactional support from K8s? In ZooKeeper we had the same problem (we should probably change it at some point to simply use a transaction which checks whether the writer is still the leader) and therefore introduced the ephemeral lock nodes. What they allow is that there can be multiple owners of a given ZNode at a time. The last owner will then be responsible for the cleanup of the node. I see the benefit of your proposal over the stateful set proposal because it can support multiple standby JMs. Given the problem of locking key-value pairs it might be simpler to start with this approach where we only have single JM. This might already add a lot of benefits for our users. Was there a specific reason why you discarded this proposal (other than generality)? @Uce it would be great to hear your feedback on the proposal since you already implemented a K8s based HA service. Cheers, Till On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
|
Hi Till, thanks for your valuable feedback. 1. Yes, leader election and storing leader information will use a same ConfigMap. When a contender successfully performs a versioned annotation update operation to the ConfigMap, it means that it has been elected as the leader. And it will write the leader information in the callback of leader elector[1]. The Kubernetes resource version will help us to avoid the leader ConfigMap is wrongly updated. 2. The lock and release is really a valid concern. Actually in current design, we could not guarantee that the node who tries to write his ownership is the real leader. Who writes later, who is the owner. To address this issue, we need to store all the owners of the key. Only when the owner is empty, the specific key(means a checkpoint or job graph) could be deleted. However, we may have a residual checkpoint or job graph when the old JobManager crashed exceptionally and do not release the lock. To solve this problem completely, we need a timestamp renew mechanism for CompletedCheckpointStore and JobGraphStore, which could help us to the check the JobManager timeout and then clean up the residual keys. 3. Frankly speaking, I am not against with this solution. However, in my opinion, it is more like a temporary proposal. We could use StatefulSet to avoid leader election and leader retrieval. But I am not sure whether TaskManager could properly handle the situation that same hostname with different IPs, because the JobManager failed and relaunched. Also we may still have two JobManagers running in some corner cases(e.g. kubelet is down but the pod is running). Another concern is we have a strong dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it is not always true especially in self-build Kubernetes cluster. Moreover, PV provider should guarantee that each PV could only be mounted once. Since the native HA proposal could cover all the functionality of StatefulSet proposal, that's why I prefer the former. Best, Yang Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
|
For 1. I was wondering whether we can't write the leader connection information directly when trying to obtain the leadership (trying to update the leader key with one's own value)? This might be a little detail, though. 2. Alright, so we are having a similar mechanism as we have in ZooKeeper with the ephemeral lock nodes. I guess that this complicates the implementation a bit, unfortunately. 3. Wouldn't the StatefulSet solution also work without a PV? One could configure a different persistent storage like HDFS or S3 for storing the checkpoints and job blobs like in the ZooKeeper case. The current benefit I see is that we avoid having to implement this multi locking mechanism in the ConfigMaps using the annotations because we can be sure that there is only a single leader at a time if I understood the guarantees of K8s correctly. Cheers, Till On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:
|
Hi, Generally +1 for a native k8s HA service. For leader election & publish leader information, there was a discussion[1] pointed out that since these two actions is NOT atomic, there will be always edge case where a previous leader overwrite leader information, even with versioned write. Versioned write helps on read again if version mismatches so if we want version write works, information in the kv pair should help the contender reflects whether it is the current leader. The idea of writes leader information on contender node or something equivalent makes sense but the details depends on how it is implemented. General problems are that 1. TM might be a bit late before it updated correct leader information but only if the leader election process is short and leadership is stable at most time, it won't be a serious issue. 2. The process TM extract leader information might be a bit more complex than directly watching a fixed key. Atomic issue can be addressed if one leverages low APIs such as lease & txn but it causes more developing efforts. ConfigMap and encapsulated interface, thought, provides only a self-consistent mechanism which doesn't promise more consistency for extension. Best, tison. Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道: For 1. I was wondering whether we can't write the leader connection |
Thanks till and tison for your comments. 1. I am afraid we could not do this if we are going to use fabric8 Kubernetes client SDK for the leader election. The official Kubernetes Java client[1] also could not support it. Unless we implement a new LeaderElector in Flink based on the very basic Kubernetes API. But it seems that we could gain too much from this. 2. Yes, the implementation will be a little complicated if we want to completely eliminate the residual job graphs or checkpoints. Inspired by your suggestion, another different solution has come into my mind. We could use a same ConfigMap storing the JobManager leader, job graph, checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for the HA meta storage. Then it will be easier to guarantee that only the leader could write the ConfigMap in a transactional operation. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation. 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However, we still have the chances that two JobManager are running and trying to get/delete a key in the same ConfigMap concurrently. Imagine that the kubelet(like NodeManager in YARN) is down, and then the JobManager could not be deleted. A new JobManager pod will be launched. We are just in the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit is we do not need to implement a leader election/retrieval service. @tison Actually, I do not think we will have such issue in the Kubernetes HA service. In the Kubernetes LeaderElector[2], we have the leader information stored on the annotation of leader ConfigMap. So it would not happen the old leader could wrongly override the leader information. Once a JobManager want to write his leader information to the ConfigMap, it will check whether it is the leader now. If not, anything will happen. Moreover, the Kubernetes Resource Version[3] ensures that no one else has snuck in and written a different update while the client was in the process of performing its update. Best, Yang tison <[hidden email]> 于2020年9月30日周三 下午3:21写道:
|
Thanks for your explanation. It would be fine if only checking leadership & actually write information is atomic. Best, tison. Yang Wang <[hidden email]> 于2020年9月30日周三 下午3:57写道:
|
Thanks for the clarifications Yang Wang. 2. Keeping the HA information relevant for a component (Dispatcher, JobManager, ResourceManager) in a single ConfigMap sounds good. We should check that we don't exceed the 1 MB size limit with this approach though. The Dispatcher's ConfigMap would then contain the current leader, the running jobs and the pointers to the persisted JobGraphs. The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints and the checkpoint ID counter, for example. 3. Ah ok, I somehow thought that K8s would give us stronger guarantees than Yarn in this regard. That's a pity. Cheers, Till On Wed, Sep 30, 2020 at 10:03 AM tison <[hidden email]> wrote:
|
2. Yes. This is exactly what I mean. Storing the HA information relevant to a specific component in a single ConfigMap and ensuring that “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation. Since we only store the job graph stateHandler(not the real data) in the ConfigMap, I think 1MB is big enough for the dispater-leader ConfigMap(the biggest one with multiple jobs). I roughly calculate that could we have more than 1000 Flink jobs in a Flink session cluster. 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet could provide at most one semantics if no manually force-deletion happened[1]. Based on the previous discussion, we have successfully avoided the "lock-and-release" in the implementation. So I still insist on using the current Deployment. Best, Yang Till Rohrmann <[hidden email]> 于2020年9月30日周三 下午11:57写道:
|
3. We could avoid force deletions from within Flink. If the user does it, then we don't give guarantees. I am fine with your current proposal. +1 for moving forward with it. Cheers, Till On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <[hidden email]> wrote:
|
3. Make sense to me. And we could add a new HA solution "StatefulSet + PV + FileSystem" at any time if we need in the future. Since there are no more open questions, I will start the voting now. Thanks all for your comments and feedback. Feel feel to continue the discussion if you get other concerns. Best, Yang Till Rohrmann <[hidden email]> 于2020年10月1日周四 下午4:52写道: 3. We could avoid force deletions from within Flink. If the user does it, |
Free forum by Nabble | Edit this page |