Hi Community, Yang, I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1] I am using Flink 1.12.1, Scala 2.11, java 11. I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated. The relevant Flink source code is CliFrontend.java class [2] The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for
-Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work. # List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below) $ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster # Cancel running job (I can’t execute this command succcessfully) $ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId> I think those commands needs to communicate with the endpoint that shows after the the job submission command.
# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127 $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \ -Dkubernetes.container.image.pull-policy=IfNotPresent \ -Dkubernetes.container.image.pull-secrets=ocirsecret \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.service-account=flink-service-account \ local:///opt/flink/usrlib/quickstart-0.1.jar When the expose type is NodePort, the printed messages says the the Flink JobManager Web Interface:is at
http://192.29.104.156:30996 192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable. I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster) 100.104.154.73:30996 100.104.154.74:30996 100.104.154.75:30996 I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4].
According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?
# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127 $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \ -Dkubernetes.container.image.pull-policy=IfNotPresent \ -Dkubernetes.container.image.pull-secrets=ocirsecret \ -Dkubernetes.rest-service.exposed.type=LoadBalancer \ -Dkubernetes.service-account=flink-service-account \ local:///opt/flink/usrlib/quickstart-0.1.jar After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081 When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5] I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics.. Reference: [4] https://pastebin.ubuntu.com/p/WcJMwds52r/ [5] https://pastebin.ubuntu.com/p/m27BnQGXQc/ Thanks for your help in advance. Best regards, Fuyao |
Hi Fuyao, Thanks for trying the native Kubernetes integration. Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type". * ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the "flink list/cancel" could work well. apiVersion: apps/v1 kind: Deployment metadata: name: flink-client spec: replicas: 1 selector: matchLabels: app: flink-client template: metadata: labels: app: flink-client spec: containers: - name: client image: flink:1.12.2 imagePullPolicy: Always args: ["sleep", "86400"] * NodePort Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done. * LoadBalancer Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well. Best, Yang Fuyao Li <[hidden email]> 于2021年3月27日周六 上午5:59写道:
|
Hello Yang, Thank you so much for providing me the flink-client.yaml. I was able to make some progress. I didn’t realize I should create an new pod flink-client to list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe that is
the reason why it doesn’t work. However, I still have several questions. I created the deployment based on your flink-client.yaml For the LoadBalancer mode: After apply the cluster role binding yaml below. # https://kubernetes.io/docs/reference/access-authn-authz/rbac/ And execute the command: kubectl create clusterrolebinding service-reader-pod --clusterrole=service-reader --serviceaccount=default:default I am able to exec in the flink-client pod and list/cancel jobs. $ kubectl exec -it flink-client-776886cf4f-9h47f bash kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead. root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster 2021-03-30 21:53:14,513 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Retrieve flink cluster my-first-application-cluster successfully, JobManager Web Interface: http://144.25.13.78:8081 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java API Skeleton (RUNNING) -------------------------------------------------------------- No scheduled jobs. root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78 PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data. ^C --- 144.25.13.78 ping statistics --- 31 packets transmitted, 0 received, 100% packet loss, time 772ms Question:
Best, Fuyao From:
Yang Wang <[hidden email]> Hi Fuyao, Thanks for trying the native Kubernetes integration. Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type". * ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the "flink list/cancel" could work well. apiVersion: apps/v1 * NodePort Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done. * LoadBalancer Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client
side? If it is yes, then I think the Flink client should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well. Best, Yang Fuyao Li <[hidden email]>
于2021年3月27日周六
上午5:59写道:
|
Hi Fuyao, Thanks for sharing the progress. 1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address? I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet 144.25.13.78:8081" to check the network connectivity. 2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints? Just like my above answer, I think you need to check the network connectivity via "telnet 144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP). In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks. I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA. Best, Yang Fuyao Li <[hidden email]> 于2021年3月31日周三 上午6:37写道:
|
Hi Yang, Thanks for sharing the insights. For problem 1: I think I can’t do telnet in the container. I tried to use curl 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves such public IP is reachable inside the cluster. Just as you mentioned, there might still be some
network issues with the cluster. I will do some further check. For problem 2: I created a new K8S cluster with bastion server with some public IP assigned to it. Finally, I can see something valid from my browser. (There still exist some problems with connecting to some databases, but I think these network problems
are not directly related to Flink, I can investigate into it later.) For problem 3: Thanks for sharing the repo you created. I am not sure how much work it could take to develop a deployer. I understand is depends on the proficiency, could you give a rough estimation? If it is too complicated and some other options are
not significantly inferior to native Kubernetes. I might prefer to choose other options. I am currently comparing different options to deploy in Kubernetes.
I also watched the demo video you presented. [3] I noticed you mentioned that native K8S is not going to replace the other two options. I still doesn’t fully get your idea with limited explanation in the demo. Could you compare the tradeoff
a little bit? Thanks! [1]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator [2]
https://github.com/lyft/flinkk8soperator [3] https://youtu.be/pdFPr_VOWTU Best, Fuyao From:
Yang Wang <[hidden email]> Hi Fuyao, Thanks for sharing the progress.
I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet
144.25.13.78:8081" to check the network connectivity.
Just like my above answer, I think you need to check the network connectivity via "telnet
144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP).
I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications.
I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA. Best, Yang Fuyao Li <[hidden email]>
于2021年3月31日周三
上午6:37写道:
|
Hello Yang, I am just following up the previous email to see if you got some time to reply. I also took a deeper look into lyft k8s operator recently. It seems it doesn’t support HA natively. It still needs the help of ZooKeeper. In terms of this, native k8s is better. Any other ideas? Thanks for your help. Best, Fuyao From:
Fuyao Li <[hidden email]> Hi Yang, Thanks for sharing the insights. For problem 1: I think I can’t do telnet in the container. I tried to use curl 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves such public IP is reachable inside the cluster. Just as you mentioned, there might still be some
network issues with the cluster. I will do some further check. For problem 2: I created a new K8S cluster with bastion server with some public IP assigned to it. Finally, I can see something valid from my browser. (There still exist some problems with connecting to some databases, but I think these network problems
are not directly related to Flink, I can investigate into it later.) For problem 3: Thanks for sharing the repo you created. I am not sure how much work it could take to develop a deployer. I understand is depends on the proficiency, could you give a rough estimation? If it is too complicated and some other options are
not significantly inferior to native Kubernetes. I might prefer to choose other options. I am currently comparing different options to deploy in Kubernetes.
I also watched the demo video you presented. [3] I noticed you mentioned that native K8S is not going to replace the other two options. I still doesn’t fully get your idea with limited explanation in the demo. Could you compare the tradeoff
a little bit? Thanks! [1]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator [2] https://github.com/lyft/flinkk8soperator [3]
https://youtu.be/pdFPr_VOWTU Best, Fuyao From:
Yang Wang <[hidden email]> Hi Fuyao, Thanks for sharing the progress.
I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet
144.25.13.78:8081" to check the network connectivity.
Just like my above answer, I think you need to check the network connectivity via "telnet
144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP).
I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications.
I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA. Best, Yang Fuyao Li <[hidden email]>
于2021年3月31日周三
上午6:37写道:
|
Hi Fuyao, Sorry for the late reply. It is not very hard to develop your own deployer. Actually, I have 3 days for developing the PoC version of flink-native-k8s-operator. So if you want to have a fully functional K8s operator, maybe two weeks is enough. But if you want to put it into production, you may need some more time to polish it for easier use. Flink native K8s integration is not going to replace the standalone mode. First, not all the Flink standalone clusters are running on the K8s. And standalone mode could work really well with reactive mode[1]. Flink native K8s integration is not going to replace the K8s operator. Actually, the Flink K8s operator is not on the same level of Flink native integration. The Flink k8s operator is responsible for managing the lifecycle of Flink application. Also it is to make the submission more K8s style. The google and lyft Flink k8s operator could support native mode. They just do not have the support right now. Kubernetes HA could work both for standalone mode and native mode. You could find the configuration here[2]. However, you might need some changes on the Flink k8s operator to make it work. Because we need to add more args(e.g. --host) to the JobManager start commands. Best, Yang Fuyao Li <[hidden email]> 于2021年4月5日周一 下午1:33写道:
|
Hi Yang, Thanks for the reply, those information is very helpful. Best, Fuyao From:
Yang Wang <[hidden email]> Hi Fuyao, Sorry for the late reply. It is not very hard to develop your own deployer. Actually, I have 3 days for developing the PoC version of flink-native-k8s-operator. So if you want to have a fully functional K8s operator, maybe two weeks is enough. But if you want to put it into production, you may need some more time to polish it for easier use. Flink native K8s integration is not going to replace the standalone mode. First, not all the Flink standalone clusters are running on the K8s. And standalone mode could work really well with reactive mode[1]. Flink native K8s integration is not going to replace the K8s operator. Actually, the Flink K8s operator is not on the same level of Flink native integration. The Flink k8s operator is responsible for managing the lifecycle of Flink application. Also it is to make the submission more K8s style. The google and lyft Flink k8s operator could support native mode. They just do not have the support right now. Kubernetes HA could work both for standalone mode and native mode. You could find the configuration here[2]. However, you might need some changes on the Flink k8s operator to make it work. Because we need to add more args(e.g. --host) to the JobManager start commands. Best, Yang Fuyao Li <[hidden email]>
于2021年4月5日周一
下午1:33写道:
|
Free forum by Nabble | Edit this page |