Re: Flink Jobmanager HA deployment on k8s

Posted by Yang Wang on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-Jobmanager-HA-deployment-on-k8s-tp40827p40867.html

Hi Chirag Dewan,

Yes, we could have multiple replicas with ZK HA in K8 as well. Multiple JobManagers will contend for
a leader and then write its rpc address to the ZooKeeper nodes. You could find more information how the
HA service works here[1]. It is about the KubernetesHAService, but the ZooKeeperHAService has the same
mechanism.

In such a case, I strongly suggest not using the service as the JobManager rpc address. Otherwise, we
will have the issue you have mentioned. There are 3 replicas behind the same service endpoint and only
one of them is the leader. TaskManager/Client do not know how to contact the leader.

Instead, I suggest not creating the internal service and bind the pod ip to the JobManager rpc address.
After then, TaskManager/Client will retrieve the leader address(pod ip + port) and contact via such an address.

Please find more information and the example here[1].

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[2]. https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715

Best,
Yang


Amit Bhatia <[hidden email]> 于2021年1月20日周三 下午12:27写道:
  Hi Yang,

I tried the deployment of flink with three replicas of Jobmanger to test a faster job recovery scenario.  Below is my deployment :

 $ kubectl get po -namit | grep zk
eric-data-coordinator-zk-0                                        1/1     Running            0          6d21h
eric-data-coordinator-zk-1                                        1/1     Running            0          6d21h
eric-data-coordinator-zk-2                                        1/1     Running            0          6d21h
flink-jobmanager-ha-zk-1-5d58dc469-8bjpb                          1/1     Running            0          19h
flink-jobmanager-ha-zk-1-5d58dc469-klg5p                          1/1     Running            0          19h
flink-jobmanager-ha-zk-1-5d58dc469-kvwzk                          1/1     Running            0          19h


 $ kubectl get svc -namit | grep zk
flink-jobmanager-ha-rest-zk1                NodePort       10.100.118.186   <none>        8081:32115/TCP                                 21h
flink-jobmanager-ha-zk1                     ClusterIP      10.111.135.174   <none>        6123/TCP,6124/TCP,8081/TCP                     21h
eric-data-coordinator-zk                    ClusterIP      10.105.139.167   <none>        2181/TCP,8080/TCP,21007/TCP                    7d20h
eric-data-coordinator-zk-ensemble-service   ClusterIP      None             <none>        2888/TCP,3888/TCP                              7d20h

Flink Configmap:
====================
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config-ha-zk-1
  namespace: amit
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager-ha-zk1
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
    # High Availability parameters
    high-availability: zookeeper
    high-availability.cluster-id: /haclusterzk1
    high-availability.storageDir: file:///opt/flink/recovery/
    high-availability.zookeeper.path.root: /flinkhazk
    high-availability.zookeeper.quorum: eric-data-coordinator-zk:2181
    high-availability.jobmanager.port: 6123
===============================================================

Out of the three replicas of Job manager pods in one of the pod i am getting this error:

2021-01-19 08:18:33,982 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
2021-01-19 08:21:39,381 WARN  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
2021-01-19 08:21:42,521 WARN  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
2021-01-19 08:21:45,508 WARN  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
2021-01-19 08:21:46,369 WARN  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
2021-01-19 08:22:13,658 WARN  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
2021-01-20 04:10:39,836 WARN  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.


And when trying to access the GUI getting below error:

image.png

In zookeeper i could see all the three id's are there

[zk: localhost:2181(CONNECTED) 5] ls /flinkhazk/haclusterzk1/leaderlatch/dispatcher_lock
[_c_1d5fc8b1-063f-4a1c-ad0f-ec46b6f10f36-latch-0000000020, _c_229d0739-8854-4a5a-ace7-377d9edc575f-latch-0000000018, _c_4eac3aaf-3f0f-4297-ac7f-086821548697-latch-0000000019]
[zk: localhost:2181(CONNECTED) 6]

So i have below queries on this:

1) what is the correct way to start three jobmanager replicas with zk ? Is there any link which explains this deployment scenario and configuration ?

2) How we'll identify that out of three replicas, which Job Manager replica is the leader ?

Regards,
Amit Bhatia


On Wed, Jan 20, 2021 at 9:44 AM Chirag Dewan <[hidden email]> wrote:
Hi,

Can we have multiple replicas with ZK HA in K8 as well?
In this case, how does Task Managers and clients recover the Job Manager RPC address? Are they updated in ZK?
Also, since there are 3 replicas behind the same service endpoint and only one of them is the leader, how should clients reach the leader Job Manager?

On Wednesday, 20 January, 2021, 07:41:20 am IST, Yang Wang <[hidden email]> wrote:


If you do not want to run multiple JobManagers simultaneously, then I think the "Job" for application cluster
with HA enable is enough.
K8s will also launch a new pod/container when the old one terminated exceptionally.

Best,
Yang

Yang Wang <[hidden email]> 于2021年1月20日周三 上午10:08写道:
Yes. Using a "Deployment" instead of "Job" for the application cluster also makes sense.
Actually, in the native K8s integration, we always use the deployment for JobManager.

But please note that the deployment may relaunch the JobManager pod even though you cancel
the Flink job.

Best,
Yang

Ashish Nigam <[hidden email]> 于2021年1月20日周三 上午5:29写道:
Yang,
For Application clusters, does it make sense to deploy JobManager as "Deployment" rather than as a "Job", as suggested in docs?
I am asking this because I am thinking of deploying a job manager in HA mode even for application clusters.

Thanks
Ashish


On Tue, Jan 19, 2021 at 6:16 AM Yang Wang <[hidden email]> wrote:
Usually, you do not need to start multiple JobManager simultaneously. The JobManager is a deployment.
A new one pod/container will be launched once it terminated exceptionally. 

If you still want to start multiple JobManagers to get a faster recovery, you could set the replica greater than 1
for standalone cluster on K8s[1]. For native integration[2], we still have not supported such configuration[2].

Please note that the key point to enable HA is not start multiple JobManagers simultaneously or sequently.
You need to set the ZooKeeperHAService[4] or KubernetesHAService[5] to ensure the Flink job could recover
from latest successful checkpoint.


Best,
Yang

Amit Bhatia <[hidden email]> 于2021年1月19日周二 下午8:45写道:
Hi,

I am deploying Flink 1.12 on K8s. Can anyone confirm if we can deploy multiple job manager pods in K8s for HA or it should always be only a single job manager pod ?

Regards,
Amit Bhatia