[DISCUSS] FLIP-144: Native Kubernetes HA for Flink

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Yang Wang
Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more convenient.

Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Xintong Song
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin ConfigMap for Flink's HA services should significantly reduce the maintenance overhead compared to deploying a ZK cluster. I think this is an attractive feature for users.

Concerning the proposed design, I have some questions. Might not be problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending leader`, and `leader RPC address`)? What happens if the two ConfigMaps are not updated consistently? E.g., a TM learns about a new JM becoming leader (lock for contending leader updated), but still gets the old leader's address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that other peers can write/remove the stored object. What if the previous owner failed to release the lock (e.g., dead before releasing)? Would there be any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]> wrote:
Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more convenient.

Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Stephan Ewen
This is a very cool feature proposal.

One lesson-learned from the ZooKeeper-based HA is that it is overly complicated to have the Leader RPC address in a different node than the LeaderLock. There is extra code needed to make sure these converge and the can be temporarily out of sync.

A much easier design would be to have the RPC address as payload in the lock entry (ZNode in ZK), the same way that the leader fencing token is stored as payload of the lock.
I think for the design above it would mean having a single ConfigMap for both leader lock and leader RPC address discovery. 

This probably serves as a good design principle in general - not divide information that is updated together over different resources.

Best,
Stephan


On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]> wrote:
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin ConfigMap for Flink's HA services should significantly reduce the maintenance overhead compared to deploying a ZK cluster. I think this is an attractive feature for users.

Concerning the proposed design, I have some questions. Might not be problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending leader`, and `leader RPC address`)? What happens if the two ConfigMaps are not updated consistently? E.g., a TM learns about a new JM becoming leader (lock for contending leader updated), but still gets the old leader's address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that other peers can write/remove the stored object. What if the previous owner failed to release the lock (e.g., dead before releasing)? Would there be any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]> wrote:
Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more convenient.

Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Yang Wang
Hi Xintong and Stephan,

Thanks a lot for your attention on this FLIP. I will address the comments inline.

# Architecture -> One or two ConfigMaps

Both of you are right. One ConfigMap will make the design and implementation easier. Actually, in my POC codes,
I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest server component) for the leader election
and storage. Once a JobManager win the election, it will update the ConfigMap with leader address and periodically
renew the lock annotation to keep as the active leader. I will update the FLIP document, including the architecture diagram,
to avoid the misunderstanding.


# HA storage > Lock and release

This is a valid concern. Since for Zookeeper ephemeral nodes, it will be deleted by the ZK server automatically when
the client is timeout. It could happen in a bad network environment or the ZK client crashed exceptionally. For Kubernetes,
we need to implement a similar mechanism. First, when we want to lock a specific key in ConfigMap, we will put the owner identify,
lease duration, renew time in the ConfigMap annotation. The annotation will be cleaned up when releasing the lock. When
we want to remove a job graph or checkpoints, it should satisfy the following conditions. If not, the delete operation could not be done. 
* Current instance is the owner of the key.
* The owner annotation is empty, which means the owner has released the lock.
* The owner annotation timed out, which usually indicate the owner died.


# HA storage > HA data clean up

Sorry for that I do not describe how the HA related ConfigMap is retained clearly. Benefit from the Kubernetes OwnerReference[1],
we set owner of the flink-conf configmap, service and TaskManager pods to JobManager Deployment. So when we want to
destroy a Flink cluster, we just need to delete the deployment[2]. For the HA related ConfigMaps, we do not set the owner
so that they could be retained even though we delete the whole Flink cluster.




Best,
Yang


Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
This is a very cool feature proposal.

One lesson-learned from the ZooKeeper-based HA is that it is overly complicated to have the Leader RPC address in a different node than the LeaderLock. There is extra code needed to make sure these converge and the can be temporarily out of sync.

A much easier design would be to have the RPC address as payload in the lock entry (ZNode in ZK), the same way that the leader fencing token is stored as payload of the lock.
I think for the design above it would mean having a single ConfigMap for both leader lock and leader RPC address discovery. 

This probably serves as a good design principle in general - not divide information that is updated together over different resources.

Best,
Stephan


On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]> wrote:
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin ConfigMap for Flink's HA services should significantly reduce the maintenance overhead compared to deploying a ZK cluster. I think this is an attractive feature for users.

Concerning the proposed design, I have some questions. Might not be problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending leader`, and `leader RPC address`)? What happens if the two ConfigMaps are not updated consistently? E.g., a TM learns about a new JM becoming leader (lock for contending leader updated), but still gets the old leader's address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that other peers can write/remove the stored object. What if the previous owner failed to release the lock (e.g., dead before releasing)? Would there be any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]> wrote:
Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more convenient.

Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Till Rohrmann
Thanks for creating this FLIP Yang Wang. I believe that many of our users will like a ZooKeeper-less HA setup.

+1 for not separating the leader information and the leader election if possible. Maybe it is even possible that the contender writes his leader information directly when trying to obtain the leadership by performing a versioned write operation.

Concerning the lock and release operation I have a question: Can there be multiple owners for a given key-value pair in a ConfigMap? If not, how can we ensure that the node which writes his ownership is actually the leader w/o transactional support from K8s? In ZooKeeper we had the same problem (we should probably change it at some point to simply use a transaction which checks whether the writer is still the leader) and therefore introduced the ephemeral lock nodes. What they allow is that there can be multiple owners of a given ZNode at a time. The last owner will then be responsible for the cleanup of the node.

I see the benefit of your proposal over the stateful set proposal because it can support multiple standby JMs. Given the problem of locking key-value pairs it might be simpler to start with this approach where we only have single JM. This might already add a lot of benefits for our users. Was there a specific reason why you discarded this proposal (other than generality)?

@Uce it would be great to hear your feedback on the proposal since you already implemented a K8s based HA service.

Cheers,
Till

On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
Hi Xintong and Stephan,

Thanks a lot for your attention on this FLIP. I will address the comments inline.

# Architecture -> One or two ConfigMaps

Both of you are right. One ConfigMap will make the design and implementation easier. Actually, in my POC codes,
I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest server component) for the leader election
and storage. Once a JobManager win the election, it will update the ConfigMap with leader address and periodically
renew the lock annotation to keep as the active leader. I will update the FLIP document, including the architecture diagram,
to avoid the misunderstanding.


# HA storage > Lock and release

This is a valid concern. Since for Zookeeper ephemeral nodes, it will be deleted by the ZK server automatically when
the client is timeout. It could happen in a bad network environment or the ZK client crashed exceptionally. For Kubernetes,
we need to implement a similar mechanism. First, when we want to lock a specific key in ConfigMap, we will put the owner identify,
lease duration, renew time in the ConfigMap annotation. The annotation will be cleaned up when releasing the lock. When
we want to remove a job graph or checkpoints, it should satisfy the following conditions. If not, the delete operation could not be done. 
* Current instance is the owner of the key.
* The owner annotation is empty, which means the owner has released the lock.
* The owner annotation timed out, which usually indicate the owner died.


# HA storage > HA data clean up

Sorry for that I do not describe how the HA related ConfigMap is retained clearly. Benefit from the Kubernetes OwnerReference[1],
we set owner of the flink-conf configmap, service and TaskManager pods to JobManager Deployment. So when we want to
destroy a Flink cluster, we just need to delete the deployment[2]. For the HA related ConfigMaps, we do not set the owner
so that they could be retained even though we delete the whole Flink cluster.




Best,
Yang


Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
This is a very cool feature proposal.

One lesson-learned from the ZooKeeper-based HA is that it is overly complicated to have the Leader RPC address in a different node than the LeaderLock. There is extra code needed to make sure these converge and the can be temporarily out of sync.

A much easier design would be to have the RPC address as payload in the lock entry (ZNode in ZK), the same way that the leader fencing token is stored as payload of the lock.
I think for the design above it would mean having a single ConfigMap for both leader lock and leader RPC address discovery. 

This probably serves as a good design principle in general - not divide information that is updated together over different resources.

Best,
Stephan


On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]> wrote:
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin ConfigMap for Flink's HA services should significantly reduce the maintenance overhead compared to deploying a ZK cluster. I think this is an attractive feature for users.

Concerning the proposed design, I have some questions. Might not be problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending leader`, and `leader RPC address`)? What happens if the two ConfigMaps are not updated consistently? E.g., a TM learns about a new JM becoming leader (lock for contending leader updated), but still gets the old leader's address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that other peers can write/remove the stored object. What if the previous owner failed to release the lock (e.g., dead before releasing)? Would there be any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]> wrote:
Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more convenient.

Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Yang Wang
Hi Till, thanks for your valuable feedback.

1. Yes, leader election and storing leader information will use a same ConfigMap. When a contender successfully performs a versioned annotation update operation to the ConfigMap, it means that it has been elected as the leader. And it will write the leader information in the callback of leader elector[1]. The Kubernetes resource version will help us to avoid the leader ConfigMap is wrongly updated.

2. The lock and release is really a valid concern. Actually in current design, we could not guarantee that the node who tries to write his ownership is the real leader. Who writes later, who is the owner. To address this issue, we need to store all the owners of the key. Only when the owner is empty, the specific key(means a checkpoint or job graph) could be deleted. However, we may have a residual checkpoint or job graph when the old JobManager crashed exceptionally and do not release the lock. To solve this problem completely, we need a timestamp renew mechanism for CompletedCheckpointStore and JobGraphStore, which could help us to the check the JobManager timeout and then clean up the residual keys.

3. Frankly speaking, I am not against with this solution. However, in my opinion, it is more like a temporary proposal. We could use StatefulSet to avoid leader election and leader retrieval. But I am not sure whether TaskManager could properly handle the situation that same hostname with different IPs, because the JobManager failed and relaunched. Also we may still have two JobManagers running in some corner cases(e.g. kubelet is down but the pod is running). Another concern is we have a strong dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it is not always true especially in self-build Kubernetes cluster. Moreover, PV provider should guarantee that each PV could only be mounted once. Since the native HA proposal could cover all the functionality of StatefulSet proposal, that's why I prefer the former.



Best,
Yang

Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
Thanks for creating this FLIP Yang Wang. I believe that many of our users will like a ZooKeeper-less HA setup.

+1 for not separating the leader information and the leader election if possible. Maybe it is even possible that the contender writes his leader information directly when trying to obtain the leadership by performing a versioned write operation.

Concerning the lock and release operation I have a question: Can there be multiple owners for a given key-value pair in a ConfigMap? If not, how can we ensure that the node which writes his ownership is actually the leader w/o transactional support from K8s? In ZooKeeper we had the same problem (we should probably change it at some point to simply use a transaction which checks whether the writer is still the leader) and therefore introduced the ephemeral lock nodes. What they allow is that there can be multiple owners of a given ZNode at a time. The last owner will then be responsible for the cleanup of the node.

I see the benefit of your proposal over the stateful set proposal because it can support multiple standby JMs. Given the problem of locking key-value pairs it might be simpler to start with this approach where we only have single JM. This might already add a lot of benefits for our users. Was there a specific reason why you discarded this proposal (other than generality)?

@Uce it would be great to hear your feedback on the proposal since you already implemented a K8s based HA service.

Cheers,
Till

On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
Hi Xintong and Stephan,

Thanks a lot for your attention on this FLIP. I will address the comments inline.

# Architecture -> One or two ConfigMaps

Both of you are right. One ConfigMap will make the design and implementation easier. Actually, in my POC codes,
I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest server component) for the leader election
and storage. Once a JobManager win the election, it will update the ConfigMap with leader address and periodically
renew the lock annotation to keep as the active leader. I will update the FLIP document, including the architecture diagram,
to avoid the misunderstanding.


# HA storage > Lock and release

This is a valid concern. Since for Zookeeper ephemeral nodes, it will be deleted by the ZK server automatically when
the client is timeout. It could happen in a bad network environment or the ZK client crashed exceptionally. For Kubernetes,
we need to implement a similar mechanism. First, when we want to lock a specific key in ConfigMap, we will put the owner identify,
lease duration, renew time in the ConfigMap annotation. The annotation will be cleaned up when releasing the lock. When
we want to remove a job graph or checkpoints, it should satisfy the following conditions. If not, the delete operation could not be done. 
* Current instance is the owner of the key.
* The owner annotation is empty, which means the owner has released the lock.
* The owner annotation timed out, which usually indicate the owner died.


# HA storage > HA data clean up

Sorry for that I do not describe how the HA related ConfigMap is retained clearly. Benefit from the Kubernetes OwnerReference[1],
we set owner of the flink-conf configmap, service and TaskManager pods to JobManager Deployment. So when we want to
destroy a Flink cluster, we just need to delete the deployment[2]. For the HA related ConfigMaps, we do not set the owner
so that they could be retained even though we delete the whole Flink cluster.




Best,
Yang


Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
This is a very cool feature proposal.

One lesson-learned from the ZooKeeper-based HA is that it is overly complicated to have the Leader RPC address in a different node than the LeaderLock. There is extra code needed to make sure these converge and the can be temporarily out of sync.

A much easier design would be to have the RPC address as payload in the lock entry (ZNode in ZK), the same way that the leader fencing token is stored as payload of the lock.
I think for the design above it would mean having a single ConfigMap for both leader lock and leader RPC address discovery. 

This probably serves as a good design principle in general - not divide information that is updated together over different resources.

Best,
Stephan


On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]> wrote:
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin ConfigMap for Flink's HA services should significantly reduce the maintenance overhead compared to deploying a ZK cluster. I think this is an attractive feature for users.

Concerning the proposed design, I have some questions. Might not be problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending leader`, and `leader RPC address`)? What happens if the two ConfigMaps are not updated consistently? E.g., a TM learns about a new JM becoming leader (lock for contending leader updated), but still gets the old leader's address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that other peers can write/remove the stored object. What if the previous owner failed to release the lock (e.g., dead before releasing)? Would there be any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]> wrote:
Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more convenient.

Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Till Rohrmann
For 1. I was wondering whether we can't write the leader connection information directly when trying to obtain the leadership (trying to update the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper with the ephemeral lock nodes. I guess that this complicates the implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could configure a different persistent storage like HDFS or S3 for storing the checkpoints and job blobs like in the ZooKeeper case. The current benefit I see is that we avoid having to implement this multi locking mechanism in the ConfigMaps using the annotations because we can be sure that there is only a single leader at a time if I understood the guarantees of K8s correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:
Hi Till, thanks for your valuable feedback.

1. Yes, leader election and storing leader information will use a same ConfigMap. When a contender successfully performs a versioned annotation update operation to the ConfigMap, it means that it has been elected as the leader. And it will write the leader information in the callback of leader elector[1]. The Kubernetes resource version will help us to avoid the leader ConfigMap is wrongly updated.

2. The lock and release is really a valid concern. Actually in current design, we could not guarantee that the node who tries to write his ownership is the real leader. Who writes later, who is the owner. To address this issue, we need to store all the owners of the key. Only when the owner is empty, the specific key(means a checkpoint or job graph) could be deleted. However, we may have a residual checkpoint or job graph when the old JobManager crashed exceptionally and do not release the lock. To solve this problem completely, we need a timestamp renew mechanism for CompletedCheckpointStore and JobGraphStore, which could help us to the check the JobManager timeout and then clean up the residual keys.

3. Frankly speaking, I am not against with this solution. However, in my opinion, it is more like a temporary proposal. We could use StatefulSet to avoid leader election and leader retrieval. But I am not sure whether TaskManager could properly handle the situation that same hostname with different IPs, because the JobManager failed and relaunched. Also we may still have two JobManagers running in some corner cases(e.g. kubelet is down but the pod is running). Another concern is we have a strong dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it is not always true especially in self-build Kubernetes cluster. Moreover, PV provider should guarantee that each PV could only be mounted once. Since the native HA proposal could cover all the functionality of StatefulSet proposal, that's why I prefer the former.



Best,
Yang

Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
Thanks for creating this FLIP Yang Wang. I believe that many of our users will like a ZooKeeper-less HA setup.

+1 for not separating the leader information and the leader election if possible. Maybe it is even possible that the contender writes his leader information directly when trying to obtain the leadership by performing a versioned write operation.

Concerning the lock and release operation I have a question: Can there be multiple owners for a given key-value pair in a ConfigMap? If not, how can we ensure that the node which writes his ownership is actually the leader w/o transactional support from K8s? In ZooKeeper we had the same problem (we should probably change it at some point to simply use a transaction which checks whether the writer is still the leader) and therefore introduced the ephemeral lock nodes. What they allow is that there can be multiple owners of a given ZNode at a time. The last owner will then be responsible for the cleanup of the node.

I see the benefit of your proposal over the stateful set proposal because it can support multiple standby JMs. Given the problem of locking key-value pairs it might be simpler to start with this approach where we only have single JM. This might already add a lot of benefits for our users. Was there a specific reason why you discarded this proposal (other than generality)?

@Uce it would be great to hear your feedback on the proposal since you already implemented a K8s based HA service.

Cheers,
Till

On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
Hi Xintong and Stephan,

Thanks a lot for your attention on this FLIP. I will address the comments inline.

# Architecture -> One or two ConfigMaps

Both of you are right. One ConfigMap will make the design and implementation easier. Actually, in my POC codes,
I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest server component) for the leader election
and storage. Once a JobManager win the election, it will update the ConfigMap with leader address and periodically
renew the lock annotation to keep as the active leader. I will update the FLIP document, including the architecture diagram,
to avoid the misunderstanding.


# HA storage > Lock and release

This is a valid concern. Since for Zookeeper ephemeral nodes, it will be deleted by the ZK server automatically when
the client is timeout. It could happen in a bad network environment or the ZK client crashed exceptionally. For Kubernetes,
we need to implement a similar mechanism. First, when we want to lock a specific key in ConfigMap, we will put the owner identify,
lease duration, renew time in the ConfigMap annotation. The annotation will be cleaned up when releasing the lock. When
we want to remove a job graph or checkpoints, it should satisfy the following conditions. If not, the delete operation could not be done. 
* Current instance is the owner of the key.
* The owner annotation is empty, which means the owner has released the lock.
* The owner annotation timed out, which usually indicate the owner died.


# HA storage > HA data clean up

Sorry for that I do not describe how the HA related ConfigMap is retained clearly. Benefit from the Kubernetes OwnerReference[1],
we set owner of the flink-conf configmap, service and TaskManager pods to JobManager Deployment. So when we want to
destroy a Flink cluster, we just need to delete the deployment[2]. For the HA related ConfigMaps, we do not set the owner
so that they could be retained even though we delete the whole Flink cluster.




Best,
Yang


Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
This is a very cool feature proposal.

One lesson-learned from the ZooKeeper-based HA is that it is overly complicated to have the Leader RPC address in a different node than the LeaderLock. There is extra code needed to make sure these converge and the can be temporarily out of sync.

A much easier design would be to have the RPC address as payload in the lock entry (ZNode in ZK), the same way that the leader fencing token is stored as payload of the lock.
I think for the design above it would mean having a single ConfigMap for both leader lock and leader RPC address discovery. 

This probably serves as a good design principle in general - not divide information that is updated together over different resources.

Best,
Stephan


On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]> wrote:
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin ConfigMap for Flink's HA services should significantly reduce the maintenance overhead compared to deploying a ZK cluster. I think this is an attractive feature for users.

Concerning the proposed design, I have some questions. Might not be problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending leader`, and `leader RPC address`)? What happens if the two ConfigMaps are not updated consistently? E.g., a TM learns about a new JM becoming leader (lock for contending leader updated), but still gets the old leader's address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that other peers can write/remove the stored object. What if the previous owner failed to release the lock (e.g., dead before releasing)? Would there be any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]> wrote:
Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more convenient.

Both the standalone on K8s and native K8s could benefit from the new introduced KubernetesHaService.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

tison
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but 
only if the leader election process is short and leadership is stable at most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.



Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道:
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Yang Wang
Thanks till and tison for your comments.

1. I am afraid we could not do this if we are going to use fabric8 Kubernetes client SDK for the leader election. The official Kubernetes Java client[1] also could not support it. Unless we implement a new LeaderElector in Flink based on the very basic Kubernetes API. But it seems that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to completely eliminate the residual job graphs or checkpoints. Inspired by your suggestion, another different solution has come into my mind. We could use a same ConfigMap storing the JobManager leader, job graph, checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for the HA meta storage. Then it will be easier to guarantee that only the leader could write the ConfigMap in a transactional operation. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However, we still have the chances that two JobManager are running and trying to get/delete a key in the same ConfigMap concurrently. Imagine that the kubelet(like NodeManager in YARN) is down, and then the JobManager could not be deleted. A new JobManager pod will be launched. We are just in the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA service. In the Kubernetes LeaderElector[2], we have the leader information stored on the annotation of leader ConfigMap. So it would not happen the old leader could wrongly override the leader information. Once a JobManager want to write his leader information to the ConfigMap, it will check whether it is the leader now. If not, anything will happen. Moreover, the Kubernetes Resource Version[3] ensures that no one else has snuck in and written a different update while the client was in the process of performing its update.



Best,
Yang

tison <[hidden email]> 于2020年9月30日周三 下午3:21写道:
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but 
only if the leader election process is short and leadership is stable at most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.



Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道:
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

tison
Thanks for your explanation. It would be fine if only checking leadership & actually write information is atomic.

Best,
tison.


Yang Wang <[hidden email]> 于2020年9月30日周三 下午3:57写道:
Thanks till and tison for your comments.

1. I am afraid we could not do this if we are going to use fabric8 Kubernetes client SDK for the leader election. The official Kubernetes Java client[1] also could not support it. Unless we implement a new LeaderElector in Flink based on the very basic Kubernetes API. But it seems that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to completely eliminate the residual job graphs or checkpoints. Inspired by your suggestion, another different solution has come into my mind. We could use a same ConfigMap storing the JobManager leader, job graph, checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for the HA meta storage. Then it will be easier to guarantee that only the leader could write the ConfigMap in a transactional operation. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However, we still have the chances that two JobManager are running and trying to get/delete a key in the same ConfigMap concurrently. Imagine that the kubelet(like NodeManager in YARN) is down, and then the JobManager could not be deleted. A new JobManager pod will be launched. We are just in the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA service. In the Kubernetes LeaderElector[2], we have the leader information stored on the annotation of leader ConfigMap. So it would not happen the old leader could wrongly override the leader information. Once a JobManager want to write his leader information to the ConfigMap, it will check whether it is the leader now. If not, anything will happen. Moreover, the Kubernetes Resource Version[3] ensures that no one else has snuck in and written a different update while the client was in the process of performing its update.



Best,
Yang

tison <[hidden email]> 于2020年9月30日周三 下午3:21写道:
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but 
only if the leader election process is short and leadership is stable at most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.



Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道:
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Till Rohrmann
Thanks for the clarifications Yang Wang.

2. Keeping the HA information relevant for a component (Dispatcher, JobManager, ResourceManager) in a single ConfigMap sounds good. We should check that we don't exceed the 1 MB size limit with this approach though. The Dispatcher's ConfigMap would then contain the current leader, the running jobs and the pointers to the persisted JobGraphs. The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints and the checkpoint ID counter, for example.

3. Ah ok, I somehow thought that K8s would give us stronger guarantees than Yarn in this regard. That's a pity.

Cheers,
Till

On Wed, Sep 30, 2020 at 10:03 AM tison <[hidden email]> wrote:
Thanks for your explanation. It would be fine if only checking leadership & actually write information is atomic.

Best,
tison.


Yang Wang <[hidden email]> 于2020年9月30日周三 下午3:57写道:
Thanks till and tison for your comments.

1. I am afraid we could not do this if we are going to use fabric8 Kubernetes client SDK for the leader election. The official Kubernetes Java client[1] also could not support it. Unless we implement a new LeaderElector in Flink based on the very basic Kubernetes API. But it seems that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to completely eliminate the residual job graphs or checkpoints. Inspired by your suggestion, another different solution has come into my mind. We could use a same ConfigMap storing the JobManager leader, job graph, checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for the HA meta storage. Then it will be easier to guarantee that only the leader could write the ConfigMap in a transactional operation. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However, we still have the chances that two JobManager are running and trying to get/delete a key in the same ConfigMap concurrently. Imagine that the kubelet(like NodeManager in YARN) is down, and then the JobManager could not be deleted. A new JobManager pod will be launched. We are just in the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA service. In the Kubernetes LeaderElector[2], we have the leader information stored on the annotation of leader ConfigMap. So it would not happen the old leader could wrongly override the leader information. Once a JobManager want to write his leader information to the ConfigMap, it will check whether it is the leader now. If not, anything will happen. Moreover, the Kubernetes Resource Version[3] ensures that no one else has snuck in and written a different update while the client was in the process of performing its update.



Best,
Yang

tison <[hidden email]> 于2020年9月30日周三 下午3:21写道:
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but 
only if the leader election process is short and leadership is stable at most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.



Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道:
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Yang Wang
2. Yes. This is exactly what I mean. Storing the HA information relevant to a specific component in a single ConfigMap and ensuring that “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation. Since we only store the job graph stateHandler(not the real data) in the ConfigMap, I think 1MB is big enough for the dispater-leader ConfigMap(the biggest one with multiple jobs). I roughly calculate that could we have more than 1000 Flink jobs in a Flink session cluster.

3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet could provide at most one semantics if no manually force-deletion happened[1]. Based on the previous discussion, we have successfully avoided the "lock-and-release" in the implementation. So I still insist on using the current Deployment.




Best,
Yang

Till Rohrmann <[hidden email]> 于2020年9月30日周三 下午11:57写道:
Thanks for the clarifications Yang Wang.

2. Keeping the HA information relevant for a component (Dispatcher, JobManager, ResourceManager) in a single ConfigMap sounds good. We should check that we don't exceed the 1 MB size limit with this approach though. The Dispatcher's ConfigMap would then contain the current leader, the running jobs and the pointers to the persisted JobGraphs. The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints and the checkpoint ID counter, for example.

3. Ah ok, I somehow thought that K8s would give us stronger guarantees than Yarn in this regard. That's a pity.

Cheers,
Till

On Wed, Sep 30, 2020 at 10:03 AM tison <[hidden email]> wrote:
Thanks for your explanation. It would be fine if only checking leadership & actually write information is atomic.

Best,
tison.


Yang Wang <[hidden email]> 于2020年9月30日周三 下午3:57写道:
Thanks till and tison for your comments.

1. I am afraid we could not do this if we are going to use fabric8 Kubernetes client SDK for the leader election. The official Kubernetes Java client[1] also could not support it. Unless we implement a new LeaderElector in Flink based on the very basic Kubernetes API. But it seems that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to completely eliminate the residual job graphs or checkpoints. Inspired by your suggestion, another different solution has come into my mind. We could use a same ConfigMap storing the JobManager leader, job graph, checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for the HA meta storage. Then it will be easier to guarantee that only the leader could write the ConfigMap in a transactional operation. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However, we still have the chances that two JobManager are running and trying to get/delete a key in the same ConfigMap concurrently. Imagine that the kubelet(like NodeManager in YARN) is down, and then the JobManager could not be deleted. A new JobManager pod will be launched. We are just in the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA service. In the Kubernetes LeaderElector[2], we have the leader information stored on the annotation of leader ConfigMap. So it would not happen the old leader could wrongly override the leader information. Once a JobManager want to write his leader information to the ConfigMap, it will check whether it is the leader now. If not, anything will happen. Moreover, the Kubernetes Resource Version[3] ensures that no one else has snuck in and written a different update while the client was in the process of performing its update.



Best,
Yang

tison <[hidden email]> 于2020年9月30日周三 下午3:21写道:
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but 
only if the leader election process is short and leadership is stable at most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.



Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道:
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Till Rohrmann
3. We could avoid force deletions from within Flink. If the user does it, then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <[hidden email]> wrote:
2. Yes. This is exactly what I mean. Storing the HA information relevant to a specific component in a single ConfigMap and ensuring that “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation. Since we only store the job graph stateHandler(not the real data) in the ConfigMap, I think 1MB is big enough for the dispater-leader ConfigMap(the biggest one with multiple jobs). I roughly calculate that could we have more than 1000 Flink jobs in a Flink session cluster.

3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet could provide at most one semantics if no manually force-deletion happened[1]. Based on the previous discussion, we have successfully avoided the "lock-and-release" in the implementation. So I still insist on using the current Deployment.




Best,
Yang

Till Rohrmann <[hidden email]> 于2020年9月30日周三 下午11:57写道:
Thanks for the clarifications Yang Wang.

2. Keeping the HA information relevant for a component (Dispatcher, JobManager, ResourceManager) in a single ConfigMap sounds good. We should check that we don't exceed the 1 MB size limit with this approach though. The Dispatcher's ConfigMap would then contain the current leader, the running jobs and the pointers to the persisted JobGraphs. The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints and the checkpoint ID counter, for example.

3. Ah ok, I somehow thought that K8s would give us stronger guarantees than Yarn in this regard. That's a pity.

Cheers,
Till

On Wed, Sep 30, 2020 at 10:03 AM tison <[hidden email]> wrote:
Thanks for your explanation. It would be fine if only checking leadership & actually write information is atomic.

Best,
tison.


Yang Wang <[hidden email]> 于2020年9月30日周三 下午3:57写道:
Thanks till and tison for your comments.

1. I am afraid we could not do this if we are going to use fabric8 Kubernetes client SDK for the leader election. The official Kubernetes Java client[1] also could not support it. Unless we implement a new LeaderElector in Flink based on the very basic Kubernetes API. But it seems that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to completely eliminate the residual job graphs or checkpoints. Inspired by your suggestion, another different solution has come into my mind. We could use a same ConfigMap storing the JobManager leader, job graph, checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for the HA meta storage. Then it will be easier to guarantee that only the leader could write the ConfigMap in a transactional operation. Since “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However, we still have the chances that two JobManager are running and trying to get/delete a key in the same ConfigMap concurrently. Imagine that the kubelet(like NodeManager in YARN) is down, and then the JobManager could not be deleted. A new JobManager pod will be launched. We are just in the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA service. In the Kubernetes LeaderElector[2], we have the leader information stored on the annotation of leader ConfigMap. So it would not happen the old leader could wrongly override the leader information. Once a JobManager want to write his leader information to the ConfigMap, it will check whether it is the leader now. If not, anything will happen. Moreover, the Kubernetes Resource Version[3] ensures that no one else has snuck in and written a different update while the client was in the process of performing its update.



Best,
Yang

tison <[hidden email]> 于2020年9月30日周三 下午3:21写道:
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but 
only if the leader election process is short and leadership is stable at most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.



Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道:
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Yang Wang
3. Make sense to me. And we could add a new HA solution "StatefulSet + PV + FileSystem"
at any time if we need in the future.

Since there are no more open questions, I will start the voting now.
Thanks all for your comments and feedback. Feel feel to continue the discussion if you get
other concerns.


Best,
Yang

Till Rohrmann <[hidden email]> 于2020年10月1日周四 下午4:52写道:
3. We could avoid force deletions from within Flink. If the user does it,
then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <[hidden email]> wrote:

> 2. Yes. This is exactly what I mean. Storing the HA information relevant
> to a specific component in a single ConfigMap and ensuring that “Get(check
> the leader)-and-Update(write back to the ConfigMap)” is a transactional
> operation. Since we only store the job graph stateHandler(not the real
> data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> could we have more than 1000 Flink jobs in a Flink session cluster.
>
> 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> could provide at most one semantics if no manually force-deletion
> happened[1]. Based on the previous discussion, we have successfully avoided
> the "lock-and-release" in the implementation. So I still insist on using
> the current Deployment.
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
>
>
> Best,
> Yang
>
> Till Rohrmann <[hidden email]> 于2020年9月30日周三 下午11:57写道:
>
>> Thanks for the clarifications Yang Wang.
>>
>> 2. Keeping the HA information relevant for a component (Dispatcher,
>> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
>> check that we don't exceed the 1 MB size limit with this approach though.
>> The Dispatcher's ConfigMap would then contain the current leader, the
>> running jobs and the pointers to the persisted JobGraphs. The JobManager's
>> ConfigMap would then contain the current leader, the pointers to the
>> checkpoints and the checkpoint ID counter, for example.
>>
>> 3. Ah ok, I somehow thought that K8s would give us stronger
>> guarantees than Yarn in this regard. That's a pity.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 10:03 AM tison <[hidden email]> wrote:
>>
>>> Thanks for your explanation. It would be fine if only checking
>>> leadership & actually write information is atomic.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang <[hidden email]> 于2020年9月30日周三 下午3:57写道:
>>>
>>>> Thanks till and tison for your comments.
>>>>
>>>> @Till Rohrmann <[hidden email]>
>>>> 1. I am afraid we could not do this if we are going to use fabric8
>>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>>> client[1] also could not support it. Unless we implement a new
>>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>>> that we could gain too much from this.
>>>>
>>>> 2. Yes, the implementation will be a little complicated if we want to
>>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>>> your suggestion, another different solution has come into my mind. We could
>>>> use a same ConfigMap storing the JobManager leader, job graph,
>>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>>> the HA meta storage. Then it will be easier to guarantee that only the
>>>> leader could write the ConfigMap in a transactional operation. Since
>>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>>> transactional operation.
>>>>
>>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>>> However, we still have the chances that two JobManager are running and
>>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>>> benefit is we do not need to implement a leader election/retrieval service.
>>>>
>>>> @tison
>>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>>> old leader could wrongly override the leader information. Once a JobManager
>>>> want to write his leader information to the ConfigMap, it will check
>>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>>> written a different update while the client was in the process of
>>>> performing its update.
>>>>
>>>>
>>>> [1].
>>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>>> [2].
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>>> [3].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> tison <[hidden email]> 于2020年9月30日周三 下午3:21写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Generally +1 for a native k8s HA service.
>>>>>
>>>>> For leader election & publish leader information, there was a
>>>>> discussion[1]
>>>>> pointed out that since these two actions is NOT atomic, there will be
>>>>> always
>>>>> edge case where a previous leader overwrite leader information, even
>>>>> with
>>>>> versioned write. Versioned write helps on read again if version
>>>>> mismatches
>>>>> so if we want version write works, information in the kv pair should
>>>>> help the
>>>>> contender reflects whether it is the current leader.
>>>>>
>>>>> The idea of writes leader information on contender node or something
>>>>> equivalent makes sense but the details depends on how it is
>>>>> implemented.
>>>>> General problems are that
>>>>>
>>>>> 1. TM might be a bit late before it updated correct leader information
>>>>> but
>>>>> only if the leader election process is short and leadership is stable
>>>>> at most
>>>>> time, it won't be a serious issue.
>>>>> 2. The process TM extract leader information might be a bit more
>>>>> complex
>>>>> than directly watching a fixed key.
>>>>>
>>>>> Atomic issue can be addressed if one leverages low APIs such as lease
>>>>> & txn
>>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>>> interface,
>>>>> thought, provides only a self-consistent mechanism which doesn't
>>>>> promise
>>>>> more consistency for extension.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>>
>>>>>
>>>>>
>>>>> Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午9:25写道:
>>>>>
>>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>>> information directly when trying to obtain the leadership (trying to
>>>>>> update
>>>>>> the leader key with one's own value)? This might be a little detail,
>>>>>> though.
>>>>>>
>>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>>> ZooKeeper
>>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>>> implementation a bit, unfortunately.
>>>>>>
>>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>>> the
>>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>>> benefit I
>>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>>> in
>>>>>> the ConfigMaps using the annotations because we can be sure that
>>>>>> there is
>>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>>> correctly.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Till, thanks for your valuable feedback.
>>>>>> >
>>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>>> same
>>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>>> annotation
>>>>>> > update operation to the ConfigMap, it means that it has been
>>>>>> elected as the
>>>>>> > leader. And it will write the leader information in the callback of
>>>>>> leader
>>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
>>>>>> the
>>>>>> > leader ConfigMap is wrongly updated.
>>>>>> >
>>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>>> current
>>>>>> > design, we could not guarantee that the node who tries to write his
>>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>>> > address this issue, we need to store all the owners of the key.
>>>>>> Only when
>>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>>> graph) could
>>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>>> when
>>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>>> lock. To
>>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>>> to the
>>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>>> >
>>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>>> in my
>>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>>> StatefulSet to
>>>>>> > avoid leader election and leader retrieval. But I am not sure
>>>>>> whether
>>>>>> > TaskManager could properly handle the situation that same hostname
>>>>>> with
>>>>>> > different IPs, because the JobManager failed and relaunched. Also
>>>>>> we may
>>>>>> > still have two JobManagers running in some corner cases(e.g.
>>>>>> kubelet is
>>>>>> > down but the pod is running). Another concern is we have a strong
>>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>>> But it
>>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>>> Moreover,
>>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>>> once. Since
>>>>>> > the native HA proposal could cover all the functionality of
>>>>>> StatefulSet
>>>>>> > proposal, that's why I prefer the former.
>>>>>> >
>>>>>> >
>>>>>> > [1].
>>>>>> >
>>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>>> >
>>>>>> > Best,
>>>>>> > Yang
>>>>>> >
>>>>>> > Till Rohrmann <[hidden email]> 于2020年9月28日周一 下午9:29写道:
>>>>>> >
>>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
>>>>>> our users
>>>>>> >> will like a ZooKeeper-less HA setup.
>>>>>> >>
>>>>>> >> +1 for not separating the leader information and the leader
>>>>>> election if
>>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>>> leader
>>>>>> >> information directly when trying to obtain the leadership by
>>>>>> performing a
>>>>>> >> versioned write operation.
>>>>>> >>
>>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>>> there be
>>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>>> how can
>>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>>> leader
>>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>>> problem
>>>>>> >> (we should probably change it at some point to simply use a
>>>>>> >> transaction which checks whether the writer is still the leader)
>>>>>> and
>>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>>> that
>>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>>> owner
>>>>>> >> will then be responsible for the cleanup of the node.
>>>>>> >>
>>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>>> because
>>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>>> key-value
>>>>>> >> pairs it might be simpler to start with this approach where we
>>>>>> only have
>>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>>> Was
>>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>>> >> generality)?
>>>>>> >>
>>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>>> you
>>>>>> >> already implemented a K8s based HA service.
>>>>>> >>
>>>>>> >> Cheers,
>>>>>> >> Till
>>>>>> >>
>>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <[hidden email]>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>> Hi Xintong and Stephan,
>>>>>> >>>
>>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>>> >>> comments inline.
>>>>>> >>>
>>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>>> >>>
>>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>>> rest
>>>>>> >>> server component) for the leader election
>>>>>> >>> and storage. Once a JobManager win the election, it will update
>>>>>> the
>>>>>> >>> ConfigMap with leader address and periodically
>>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>>> update
>>>>>> >>> the FLIP document, including the architecture diagram,
>>>>>> >>> to avoid the misunderstanding.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > Lock and release
>>>>>> >>>
>>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>>> will be
>>>>>> >>> deleted by the ZK server automatically when
>>>>>> >>> the client is timeout. It could happen in a bad network
>>>>>> environment or
>>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>>> lock a
>>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>>> annotation
>>>>>> >>> will be cleaned up when releasing the lock. When
>>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
>>>>>> the
>>>>>> >>> following conditions. If not, the delete operation could not be
>>>>>> done.
>>>>>> >>> * Current instance is the owner of the key.
>>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>>> released the
>>>>>> >>> lock.
>>>>>> >>> * The owner annotation timed out, which usually indicate the
>>>>>> owner died.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > HA data clean up
>>>>>> >>>
>>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>>> pods
>>>>>> >>> to JobManager Deployment. So when we want to
>>>>>> >>> destroy a Flink cluster, we just need to delete the
>>>>>> deployment[2]. For
>>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>>> >>> so that they could be retained even though we delete the whole
>>>>>> Flink
>>>>>> >>> cluster.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> [1].
>>>>>> >>>
>>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>>> >>> [2].
>>>>>> >>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Best,
>>>>>> >>> Yang
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Stephan Ewen <[hidden email]> 于2020年9月16日周三 下午8:16写道:
>>>>>> >>>
>>>>>> >>>> This is a very cool feature proposal.
>>>>>> >>>>
>>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>>> overly
>>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>>> than the
>>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>>> converge and the
>>>>>> >>>> can be temporarily out of sync.
>>>>>> >>>>
>>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>>> in the
>>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>>> token is
>>>>>> >>>> stored as payload of the lock.
>>>>>> >>>> I think for the design above it would mean having a single
>>>>>> ConfigMap
>>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>>> >>>>
>>>>>> >>>> This probably serves as a good design principle in general - not
>>>>>> divide
>>>>>> >>>> information that is updated together over different resources.
>>>>>> >>>>
>>>>>> >>>> Best,
>>>>>> >>>> Stephan
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>>> [hidden email]>
>>>>>> >>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>>> >>>>>
>>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>>> reduce the
>>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
>>>>>> think this is an
>>>>>> >>>>> attractive feature for users.
>>>>>> >>>>>
>>>>>> >>>>> Concerning the proposed design, I have some questions. Might
>>>>>> not be
>>>>>> >>>>> problems, just trying to understand.
>>>>>> >>>>>
>>>>>> >>>>> ## Architecture
>>>>>> >>>>>
>>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>>> contending
>>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>>> ConfigMaps are
>>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>>> becoming leader
>>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>>> leader's
>>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > Lock and release
>>>>>> >>>>>
>>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>>> lock so
>>>>>> >>>>> that other peers can write/remove the stored object. What if
>>>>>> the previous
>>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>>> Would there
>>>>>> >>>>> be any problem?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > HA data clean up
>>>>>> >>>>>
>>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>>> <ClusterID>`,
>>>>>> >>>>> how are the HA dada retained?
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> Thank you~
>>>>>> >>>>>
>>>>>> >>>>> Xintong Song
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>>> [hidden email]>
>>>>>> >>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>>> Hi devs and users,
>>>>>> >>>>>>
>>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>>> will
>>>>>> >>>>>> introduce
>>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>>> >>>>>>
>>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>>> widely
>>>>>> >>>>>> used
>>>>>> >>>>>> in production environments. It could be integrated in
>>>>>> standalone
>>>>>> >>>>>> cluster,
>>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>>> in K8s
>>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>>> cluster.
>>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> >>>>>> election[2]
>>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
>>>>>> leverage these
>>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>>> more
>>>>>> >>>>>> convenient.
>>>>>> >>>>>>
>>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>>> the new
>>>>>> >>>>>> introduced KubernetesHaService.
>>>>>> >>>>>>
>>>>>> >>>>>> [1].
>>>>>> >>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> >>>>>> [2].
>>>>>> >>>>>>
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> >>>>>> [3].
>>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>> >>>>>>
>>>>>> >>>>>> Looking forward to your feedback.
>>>>>> >>>>>>
>>>>>> >>>>>> Best,
>>>>>> >>>>>> Yang
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>>
>>>>>