Need help with executing Flink CLI for native Kubernetes deployment

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help with executing Flink CLI for native Kubernetes deployment

Fuyao Li-2

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Need help with executing Flink CLI for native Kubernetes deployment

Yang Wang
Hi Fuyao,

Thanks for trying the native Kubernetes integration.

Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type".

* ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the
K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the
"flink list/cancel" could work well.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-client
  template:
    metadata:
      labels:
        app: flink-client
    spec:
      containers:
      - name: client
        image: flink:1.12.2
        imagePullPolicy: Always
        args: ["sleep", "86400"]


* NodePort
Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your
APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to
support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done.

* LoadBalancer
Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client
should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well.




Best,
Yang

Fuyao Li <[hidden email]> 于2021年3月27日周六 上午5:59写道:

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Fuyao Li-2

Hello Yang,

 

Thank you so much for providing me the flink-client.yaml. I was able to make some progress. I didn’t realize I should create an new pod flink-client to list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe that is the reason why it doesn’t work. However, I still have several questions.

 

I created the deployment based on your flink-client.yaml

For the LoadBalancer mode:

 

After apply the cluster role binding yaml below.

 

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 
namespace: default
 
name: service-reader
rules:
-
apiGroups: [""] # "" indicates the core API group
 
resources: ["services"]
 
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

 

And execute the command:

kubectl create clusterrolebinding service-reader-pod  --clusterrole=service-reader  --serviceaccount=default:default

 

I am able to exec in the flink-client pod and list/cancel jobs.

 

$ kubectl exec -it flink-client-776886cf4f-9h47f bash

kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.

root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

2021-03-30 21:53:14,513 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster my-first-application-cluster successfully, JobManager Web Interface: http://144.25.13.78:8081

Waiting for response...

------------------ Running/Restarting Jobs -------------------

24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java API Skeleton (RUNNING)

--------------------------------------------------------------

No scheduled jobs.

root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78

PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

 

^C

--- 144.25.13.78 ping statistics ---

31 packets transmitted, 0 received, 100% packet loss, time 772ms

 

Question:

  1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?
  3. In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

 

Best,

Fuyao

 

From: Yang Wang <[hidden email]>
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for trying the native Kubernetes integration.

 

Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type".

 

* ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the

K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the

"flink list/cancel" could work well.

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-client
  template:
    metadata:
      labels:
        app: flink-client
    spec:
      containers:
      - name: client
        image: flink:1.12.2
        imagePullPolicy: Always
        args: ["sleep", "86400"]

 

* NodePort

Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your

APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to

support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done.

 

* LoadBalancer

Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client

should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well.

 

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021327日周六 上午5:59写道:

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Yang Wang
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?

I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet 144.25.13.78:8081" to check the network connectivity.

2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?

Just like my above answer, I think you need to check the network connectivity via "telnet 144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA.



Best,
Yang

Fuyao Li <[hidden email]> 于2021年3月31日周三 上午6:37写道:

Hello Yang,

 

Thank you so much for providing me the flink-client.yaml. I was able to make some progress. I didn’t realize I should create an new pod flink-client to list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe that is the reason why it doesn’t work. However, I still have several questions.

 

I created the deployment based on your flink-client.yaml

For the LoadBalancer mode:

 

After apply the cluster role binding yaml below.

 

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 
namespace: default
 
name: service-reader
rules:
-
apiGroups: [""] # "" indicates the core API group
 
resources: ["services"]
 
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

 

And execute the command:

kubectl create clusterrolebinding service-reader-pod  --clusterrole=service-reader  --serviceaccount=default:default

 

I am able to exec in the flink-client pod and list/cancel jobs.

 

$ kubectl exec -it flink-client-776886cf4f-9h47f bash

kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.

root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

2021-03-30 21:53:14,513 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster my-first-application-cluster successfully, JobManager Web Interface: http://144.25.13.78:8081

Waiting for response...

------------------ Running/Restarting Jobs -------------------

24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java API Skeleton (RUNNING)

--------------------------------------------------------------

No scheduled jobs.

root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78

PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

 

^C

--- 144.25.13.78 ping statistics ---

31 packets transmitted, 0 received, 100% packet loss, time 772ms

 

Question:

  1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?
  3. In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

 

Best,

Fuyao

 

From: Yang Wang <[hidden email]>
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for trying the native Kubernetes integration.

 

Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type".

 

* ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the

K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the

"flink list/cancel" could work well.

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-client
  template:
    metadata:
      labels:
        app: flink-client
    spec:
      containers:
      - name: client
        image: flink:1.12.2
        imagePullPolicy: Always
        args: ["sleep", "86400"]

 

* NodePort

Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your

APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to

support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done.

 

* LoadBalancer

Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client

should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well.

 

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021327日周六 上午5:59写道:

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Fuyao Li-2

Hi Yang,

 

Thanks for sharing the insights.

 

For problem 1:

I think I can’t do telnet in the container. I tried to use curl 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves such public IP is reachable inside the cluster. Just as you mentioned, there might still be some network issues with the cluster. I will do some further check.

 

For problem 2:

I created a new K8S cluster with bastion server with some public IP assigned to it. Finally, I can see something valid from my browser. (There still exist some problems with connecting to some databases, but I think these network problems are not directly related to Flink, I can investigate into it later.)

 

For problem 3:

Thanks for sharing the repo you created. I am not sure how much work it could take to develop a deployer. I understand is depends on the proficiency, could you give a rough estimation? If it is too complicated and some other options are not significantly inferior to native Kubernetes. I might prefer to choose other options. I am currently comparing different options to deploy in Kubernetes.

  1. Standalone K8S
  2. Native Kubernetes
  3. Flink operator (Google Cloud Platform/ Lyft) [1][2]

 

I also watched the demo video you presented. [3] I noticed you mentioned that native K8S is not going to replace the other two options. I still doesn’t fully get your idea with limited explanation in the demo. Could you compare the tradeoff a little bit? Thanks!

[1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

[2]  https://github.com/lyft/flinkk8soperator

[3] https://youtu.be/pdFPr_VOWTU

 

Best,

Fuyao

 

 

From: Yang Wang <[hidden email]>
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for sharing the progress.

 

1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?

 

I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet 144.25.13.78:8081" to check the network connectivity.

 

2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?

 

Just like my above answer, I think you need to check the network connectivity via "telnet 144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP).

 

In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA.

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021331日周三 上午6:37写道:

Hello Yang,

 

Thank you so much for providing me the flink-client.yaml. I was able to make some progress. I didn’t realize I should create an new pod flink-client to list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe that is the reason why it doesn’t work. However, I still have several questions.

 

I created the deployment based on your flink-client.yaml

For the LoadBalancer mode:

 

After apply the cluster role binding yaml below.

 

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 
namespace: default
 
name: service-reader
rules:
-
apiGroups: [""] # "" indicates the core API group
 
resources: ["services"]
 
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

 

And execute the command:

kubectl create clusterrolebinding service-reader-pod  --clusterrole=service-reader  --serviceaccount=default:default

 

I am able to exec in the flink-client pod and list/cancel jobs.

 

$ kubectl exec -it flink-client-776886cf4f-9h47f bash

kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.

root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

2021-03-30 21:53:14,513 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster my-first-application-cluster successfully, JobManager Web Interface: http://144.25.13.78:8081

Waiting for response...

------------------ Running/Restarting Jobs -------------------

24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java API Skeleton (RUNNING)

--------------------------------------------------------------

No scheduled jobs.

root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78

PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

 

^C

--- 144.25.13.78 ping statistics ---

31 packets transmitted, 0 received, 100% packet loss, time 772ms

 

Question:

  1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?
  3. In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

 

Best,

Fuyao

 

From: Yang Wang <[hidden email]>
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for trying the native Kubernetes integration.

 

Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type".

 

* ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the

K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the

"flink list/cancel" could work well.

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-client
  template:
    metadata:
      labels:
        app: flink-client
    spec:
      containers:
      - name: client
        image: flink:1.12.2
        imagePullPolicy: Always
        args: ["sleep", "86400"]

 

* NodePort

Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your

APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to

support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done.

 

* LoadBalancer

Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client

should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well.

 

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021327日周六 上午5:59写道:

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Fuyao Li-2

Hello Yang,

 

I am just following up the previous email to see if you got some time to reply.

I also took a deeper look into lyft k8s operator recently. It seems it doesn’t support HA natively. It still needs the help of ZooKeeper. In terms of this, native k8s is better. Any other ideas? Thanks for your help.

 

Best,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Yang,

 

Thanks for sharing the insights.

 

For problem 1:

I think I can’t do telnet in the container. I tried to use curl 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves such public IP is reachable inside the cluster. Just as you mentioned, there might still be some network issues with the cluster. I will do some further check.

 

For problem 2:

I created a new K8S cluster with bastion server with some public IP assigned to it. Finally, I can see something valid from my browser. (There still exist some problems with connecting to some databases, but I think these network problems are not directly related to Flink, I can investigate into it later.)

 

For problem 3:

Thanks for sharing the repo you created. I am not sure how much work it could take to develop a deployer. I understand is depends on the proficiency, could you give a rough estimation? If it is too complicated and some other options are not significantly inferior to native Kubernetes. I might prefer to choose other options. I am currently comparing different options to deploy in Kubernetes.

  1. Standalone K8S
  2. Native Kubernetes
  3. Flink operator (Google Cloud Platform/ Lyft) [1][2]

 

I also watched the demo video you presented. [3] I noticed you mentioned that native K8S is not going to replace the other two options. I still doesn’t fully get your idea with limited explanation in the demo. Could you compare the tradeoff a little bit? Thanks!

[1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

[2]  https://github.com/lyft/flinkk8soperator

[3] https://youtu.be/pdFPr_VOWTU

 

Best,

Fuyao

 

 

From: Yang Wang <[hidden email]>
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for sharing the progress.

 

1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?

 

I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet 144.25.13.78:8081" to check the network connectivity.

 

2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?

 

Just like my above answer, I think you need to check the network connectivity via "telnet 144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP).

 

In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA.

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021331日周三 上午6:37写道:

Hello Yang,

 

Thank you so much for providing me the flink-client.yaml. I was able to make some progress. I didn’t realize I should create an new pod flink-client to list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe that is the reason why it doesn’t work. However, I still have several questions.

 

I created the deployment based on your flink-client.yaml

For the LoadBalancer mode:

 

After apply the cluster role binding yaml below.

 

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 
namespace: default
 
name: service-reader
rules:
-
apiGroups: [""] # "" indicates the core API group
 
resources: ["services"]
 
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

 

And execute the command:

kubectl create clusterrolebinding service-reader-pod  --clusterrole=service-reader  --serviceaccount=default:default

 

I am able to exec in the flink-client pod and list/cancel jobs.

 

$ kubectl exec -it flink-client-776886cf4f-9h47f bash

kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.

root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

2021-03-30 21:53:14,513 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster my-first-application-cluster successfully, JobManager Web Interface: http://144.25.13.78:8081

Waiting for response...

------------------ Running/Restarting Jobs -------------------

24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java API Skeleton (RUNNING)

--------------------------------------------------------------

No scheduled jobs.

root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78

PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

 

^C

--- 144.25.13.78 ping statistics ---

31 packets transmitted, 0 received, 100% packet loss, time 772ms

 

Question:

  1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?
  3. In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

 

Best,

Fuyao

 

From: Yang Wang <[hidden email]>
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for trying the native Kubernetes integration.

 

Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type".

 

* ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the

K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the

"flink list/cancel" could work well.

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-client
  template:
    metadata:
      labels:
        app: flink-client
    spec:
      containers:
      - name: client
        image: flink:1.12.2
        imagePullPolicy: Always
        args: ["sleep", "86400"]

 

* NodePort

Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your

APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to

support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done.

 

* LoadBalancer

Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client

should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well.

 

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021327日周六 上午5:59写道:

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Yang Wang
Hi Fuyao,

Sorry for the late reply.

It is not very hard to develop your own deployer. Actually, I have 3 days for developing the PoC version of flink-native-k8s-operator. So
if you want to have a fully functional K8s operator, maybe two weeks is enough. But if you want to put it into production, you may need
some more time to polish it for easier use.

Flink native K8s integration is not going to replace the standalone mode. First, not all the Flink standalone clusters are running on the K8s.
And standalone mode could work really well with reactive mode[1].


Flink native K8s integration is not going to replace the K8s operator. Actually, the Flink K8s operator is not on the same level of Flink native
integration. The Flink k8s operator is responsible for managing the lifecycle of Flink application. Also it is to make the submission more K8s style.
The google and lyft Flink k8s operator could support native mode. They just do not have the support right now.


Kubernetes HA could work both for standalone mode and native mode. You could find the configuration here[2]. However, you might
need some changes on the Flink k8s operator to make it work. Because we need to add more args(e.g. --host) to the JobManager start commands.
 


Best,
Yang


Fuyao Li <[hidden email]> 于2021年4月5日周一 下午1:33写道:

Hello Yang,

 

I am just following up the previous email to see if you got some time to reply.

I also took a deeper look into lyft k8s operator recently. It seems it doesn’t support HA natively. It still needs the help of ZooKeeper. In terms of this, native k8s is better. Any other ideas? Thanks for your help.

 

Best,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Yang,

 

Thanks for sharing the insights.

 

For problem 1:

I think I can’t do telnet in the container. I tried to use curl 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves such public IP is reachable inside the cluster. Just as you mentioned, there might still be some network issues with the cluster. I will do some further check.

 

For problem 2:

I created a new K8S cluster with bastion server with some public IP assigned to it. Finally, I can see something valid from my browser. (There still exist some problems with connecting to some databases, but I think these network problems are not directly related to Flink, I can investigate into it later.)

 

For problem 3:

Thanks for sharing the repo you created. I am not sure how much work it could take to develop a deployer. I understand is depends on the proficiency, could you give a rough estimation? If it is too complicated and some other options are not significantly inferior to native Kubernetes. I might prefer to choose other options. I am currently comparing different options to deploy in Kubernetes.

  1. Standalone K8S
  2. Native Kubernetes
  3. Flink operator (Google Cloud Platform/ Lyft) [1][2]

 

I also watched the demo video you presented. [3] I noticed you mentioned that native K8S is not going to replace the other two options. I still doesn’t fully get your idea with limited explanation in the demo. Could you compare the tradeoff a little bit? Thanks!

[1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

[2]  https://github.com/lyft/flinkk8soperator

[3] https://youtu.be/pdFPr_VOWTU

 

Best,

Fuyao

 

 

From: Yang Wang <[hidden email]>
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for sharing the progress.

 

1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?

 

I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet 144.25.13.78:8081" to check the network connectivity.

 

2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?

 

Just like my above answer, I think you need to check the network connectivity via "telnet 144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP).

 

In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA.

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021331日周三 上午6:37写道:

Hello Yang,

 

Thank you so much for providing me the flink-client.yaml. I was able to make some progress. I didn’t realize I should create an new pod flink-client to list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe that is the reason why it doesn’t work. However, I still have several questions.

 

I created the deployment based on your flink-client.yaml

For the LoadBalancer mode:

 

After apply the cluster role binding yaml below.

 

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 
namespace: default
 
name: service-reader
rules:
-
apiGroups: [""] # "" indicates the core API group
 
resources: ["services"]
 
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

 

And execute the command:

kubectl create clusterrolebinding service-reader-pod  --clusterrole=service-reader  --serviceaccount=default:default

 

I am able to exec in the flink-client pod and list/cancel jobs.

 

$ kubectl exec -it flink-client-776886cf4f-9h47f bash

kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.

root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

2021-03-30 21:53:14,513 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster my-first-application-cluster successfully, JobManager Web Interface: http://144.25.13.78:8081

Waiting for response...

------------------ Running/Restarting Jobs -------------------

24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java API Skeleton (RUNNING)

--------------------------------------------------------------

No scheduled jobs.

root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78

PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

 

^C

--- 144.25.13.78 ping statistics ---

31 packets transmitted, 0 received, 100% packet loss, time 772ms

 

Question:

  1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?
  3. In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

 

Best,

Fuyao

 

From: Yang Wang <[hidden email]>
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for trying the native Kubernetes integration.

 

Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type".

 

* ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the

K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the

"flink list/cancel" could work well.

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-client
  template:
    metadata:
      labels:
        app: flink-client
    spec:
      containers:
      - name: client
        image: flink:1.12.2
        imagePullPolicy: Always
        args: ["sleep", "86400"]

 

* NodePort

Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your

APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to

support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done.

 

* LoadBalancer

Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client

should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well.

 

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021327日周六 上午5:59写道:

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Fuyao Li-2

Hi Yang,

 

Thanks for the reply, those information is very helpful.

 

Best,

Fuyao

 

From: Yang Wang <[hidden email]>
Date: Tuesday, April 6, 2021 at 01:11
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Sorry for the late reply.

 

It is not very hard to develop your own deployer. Actually, I have 3 days for developing the PoC version of flink-native-k8s-operator. So

if you want to have a fully functional K8s operator, maybe two weeks is enough. But if you want to put it into production, you may need

some more time to polish it for easier use.

 

Flink native K8s integration is not going to replace the standalone mode. First, not all the Flink standalone clusters are running on the K8s.

And standalone mode could work really well with reactive mode[1].

 

 

Flink native K8s integration is not going to replace the K8s operator. Actually, the Flink K8s operator is not on the same level of Flink native

integration. The Flink k8s operator is responsible for managing the lifecycle of Flink application. Also it is to make the submission more K8s style.

The google and lyft Flink k8s operator could support native mode. They just do not have the support right now.

 

 

Kubernetes HA could work both for standalone mode and native mode. You could find the configuration here[2]. However, you might

need some changes on the Flink k8s operator to make it work. Because we need to add more args(e.g. --host) to the JobManager start commands.

 

 

 

Best,

Yang

 

 

Fuyao Li <[hidden email]> 202145日周一 下午1:33写道:

Hello Yang,

 

I am just following up the previous email to see if you got some time to reply.

I also took a deeper look into lyft k8s operator recently. It seems it doesn’t support HA natively. It still needs the help of ZooKeeper. In terms of this, native k8s is better. Any other ideas? Thanks for your help.

 

Best,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Yang,

 

Thanks for sharing the insights.

 

For problem 1:

I think I can’t do telnet in the container. I tried to use curl 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves such public IP is reachable inside the cluster. Just as you mentioned, there might still be some network issues with the cluster. I will do some further check.

 

For problem 2:

I created a new K8S cluster with bastion server with some public IP assigned to it. Finally, I can see something valid from my browser. (There still exist some problems with connecting to some databases, but I think these network problems are not directly related to Flink, I can investigate into it later.)

 

For problem 3:

Thanks for sharing the repo you created. I am not sure how much work it could take to develop a deployer. I understand is depends on the proficiency, could you give a rough estimation? If it is too complicated and some other options are not significantly inferior to native Kubernetes. I might prefer to choose other options. I am currently comparing different options to deploy in Kubernetes.

  1. Standalone K8S
  2. Native Kubernetes
  3. Flink operator (Google Cloud Platform/ Lyft) [1][2]

 

I also watched the demo video you presented. [3] I noticed you mentioned that native K8S is not going to replace the other two options. I still doesn’t fully get your idea with limited explanation in the demo. Could you compare the tradeoff a little bit? Thanks!

[1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

[2]  https://github.com/lyft/flinkk8soperator

[3] https://youtu.be/pdFPr_VOWTU

 

Best,

Fuyao

 

 

From: Yang Wang <[hidden email]>
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for sharing the progress.

 

1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?

 

I think this is a environment problem. Actually, not every IP address could be tested with "ping" command. I suggest you to use "telnet 144.25.13.78:8081" to check the network connectivity.

 

2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?

 

Just like my above answer, I think you need to check the network connectivity via "telnet 144.25.13.78:8081". Maybe because the firewall is not allowed connecting from your local(e.g. your local ip is not in the white list of LoadBalancer IP).

 

In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

I think in production environment, you should have your own deployer, which will take care of submitting the jobs, list/cancel the jobs. Even the deployer could help with triggering savepoint and manage the whole lifecycle of Flink applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could be a start point of your own deployer if you want to develop it in JAVA.

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021331日周三 上午6:37写道:

Hello Yang,

 

Thank you so much for providing me the flink-client.yaml. I was able to make some progress. I didn’t realize I should create an new pod flink-client to list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe that is the reason why it doesn’t work. However, I still have several questions.

 

I created the deployment based on your flink-client.yaml

For the LoadBalancer mode:

 

After apply the cluster role binding yaml below.

 

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 
namespace: default
 
name: service-reader
rules:
-
apiGroups: [""] # "" indicates the core API group
 
resources: ["services"]
 
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

 

And execute the command:

kubectl create clusterrolebinding service-reader-pod  --clusterrole=service-reader  --serviceaccount=default:default

 

I am able to exec in the flink-client pod and list/cancel jobs.

 

$ kubectl exec -it flink-client-776886cf4f-9h47f bash

kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.

root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

2021-03-30 21:53:14,513 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster my-first-application-cluster successfully, JobManager Web Interface: http://144.25.13.78:8081

Waiting for response...

------------------ Running/Restarting Jobs -------------------

24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java API Skeleton (RUNNING)

--------------------------------------------------------------

No scheduled jobs.

root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78

PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

 

^C

--- 144.25.13.78 ping statistics ---

31 packets transmitted, 0 received, 100% packet loss, time 772ms

 

Question:

  1. The flink client is able to list/cancel jobs, based on logs shared above, I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my laptop’s browser. I am within the company’s VPN and such public load balancer should expose the flink Web UI, right? I tried to debug the network configuration, but failed to find a reason, could you give me some hints?
  3. In production, what is the suggested approach to list and cancel jobs? The current manual work of “kubectl exec” into pods is not very reliable.. How to automate this process and integrate this CI/CD? Please share some blogs there is any, thanks.

 

 

Best,

Fuyao

 

From: Yang Wang <[hidden email]>
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>
Subject: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

Hi Fuyao,

 

Thanks for trying the native Kubernetes integration.

 

Just like you know, the Flink rest service could be exposed in following three types, configured via "kubernetes.rest-service.exposed.type".

 

* ClusterIP, which means you could only access the Flink rest endpoint inside the K8s cluster. Simply, users could start a Flink client in the

K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in the pod to create a Flink session/application cluster. Also the

"flink list/cancel" could work well.

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-client
  template:
    metadata:
      labels:
        app: flink-client
    spec:
      containers:
      - name: client
        image: flink:1.12.2
        imagePullPolicy: Always
        args: ["sleep", "86400"]

 

* NodePort

Currently, we have a limitation that only the Kubernetes master nodes could be used to build the Flink exposed rest endpoint. So if your

APIServer node does not have the kube proxy, then the printed URL in the Flink client logs could not be used. We already have a ticket[1] to

support one of the slave nodes for accessing the rest endpoint. But I have not managed myself to get it done.

 

* LoadBalancer

Is the resolved rest endpoint "http://144.25.13.78:8081/" accessible on your Flink client side? If it is yes, then I think the Flink client

should be able to contact to JobManager rest server to list/cancel the jobs. I have verified in Alibaba container service, and it works well.

 

 

 

 

Best,

Yang

 

Fuyao Li <[hidden email]> 2021327日周六 上午5:59写道:

Hi Community, Yang,

 

I am new to Flink on native Kubernetes and I am trying to do a POC for native Kubernetes application mode on Oracle Cloud Infrastructure. I was following the documentation here step by step: [1]

 

I am using Flink 1.12.1, Scala 2.11, java 11.

I was able to create a native Kubernetes Deployment, but I am not able to use any further commands like list / cancel etc.. I always run into timeout error. I think the issue could be the JobManager Web Interface IP address printed after job deployment is not accessible. This issue is causing me not able to shut down the deployment with a savepoint. It could be Kubernetes configuration issue. I have exposed all related ports traffic and validated the security list, but still couldn’t make it work. Any help is appreciated.

 

 

The relevant Flink source code is CliFrontend.java class [2]

The ./bin/flink list and cancel command is trying to send traffic to the Flink dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both of them doesn’t work.

 

# List running job on the cluster (I can’t execute this command successfully due to timeout, logs shared below)

$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster

# Cancel running job (I can’t execute this command succcessfully)

$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

 

I think those commands needs to communicate with the endpoint that shows after the the job submission command.

 

  1. Use case 1(deploy with NodePort)

 

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=NodePort \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

When the expose type is NodePort, the printed messages says the the Flink  JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is my Kubernetes apiserver address. 30996 is the port that exposes the service. However, Flink dashboard in this address is not resolvable.

I can only get access to dashboard UI on each node IP address(There are three nodes in my K8S cluster)

100.104.154.73:30996

100.104.154.74:30996

100.104.154.75:30996

      I got the following errors when trying to do list command for such a native Kubernetes deployment. See in [4]. According to the documentation here [3], this shouldn’t happen since Kubernetes api server address should also have the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI available in Kubernetes apiserver address?

 

 

  1. Use case 2 (deploy with LoadBalancer)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127

$ ./bin/flink run-application \

    --target kubernetes-application \

    -Dkubernetes.cluster-id=my-first-application-cluster \

    -Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1 \

    -Dkubernetes.container.image.pull-policy=IfNotPresent \

    -Dkubernetes.container.image.pull-secrets=ocirsecret \

    -Dkubernetes.rest-service.exposed.type=LoadBalancer \

    -Dkubernetes.service-account=flink-service-account \

local:///opt/flink/usrlib/quickstart-0.1.jar

 

 

After a while, when the external IP is resolved. It said Flink JobManager web interface is at the external-IP (LOAD BALANCER address) at: http://144.25.13.78:8081

When I execute the list command, I still got error after waiting for long time to let it get timeout. See errors here. [5]

 

I can still get access to NodeIP:<service-port>. In such case, I tend to believe it is a network issue. But still quite confused since I am already open all the traffics..

 

 

 

 

Reference:

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui

[4] https://pastebin.ubuntu.com/p/WcJMwds52r/

[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/

 

 

Thanks for your help in advance.

 

Best regards,

Fuyao