Flink’s Kubernetes HA services - NOT working

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink’s Kubernetes HA services - NOT working

Daniel Peled

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 


jobmanager_log (3).txt (279K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Matthias
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 

Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Matthias
One other thing: It looks like you've set high-availability.storageDir to a local path file:///opt/flink/recovery. You should use a storage path that is accessible from all Flink cluster components (e.g. using S3). Only references are stored in Kubernetes ConfigMaps [1].


On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <[hidden email]> wrote:
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 



--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Matthias
I'm adding the Flink user ML to the conversation again.

On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <[hidden email]> wrote:
Hi Omer,
thanks for sharing the configuration. You're right: Using NFS for HA's storageDir is fine. 

About the error message you're referring to: I haven't worked with the HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes documentation [1] points out that you can use a custom service account. This one needs special permissions to start/stop pods automatically (which does not apply in your case) but also to access ConfigMaps. You might want to try setting the permission as described in [1].

Best,
Matthias


On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <[hidden email]> wrote:
Hey Matthias.
My name is Omer, i am Daniel's devops, i will elaborate about our flink situation.
these our flink resource definitions, as they are generated using the helm template command (minus log4j,metrics configuration and some sensitive data)
---
# Source: flink/templates/flink-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.execution.failover-strategy: region
    jobmanager.memory.process.size: 8g
    taskmanager.memory.process.size: 24g
    taskmanager.memory.task.off-heap.size: 1g
    taskmanager.numberOfTaskSlots: 4
    queryable-state.proxy.ports: 6125
    queryable-state.enable: true
    blob.server.port: 6124
    parallelism.default: 1
    state.backend.incremental: true
    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/rocksdb
    state.checkpoints.dir: file:///opt/flink/checkpoints
    classloader.resolve-order: child-first
    kubernetes.cluster-id: flink-cluster
    kubernetes.namespace: intel360-beta
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery

---
# Source: flink/templates/flink-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  labels:
    {}
spec:
  ports:
  - name: http-ui
    port: 8081
    targetPort: http-ui
  - name: tcp-rpc
    port: 6123
    targetPort: tcp-rpc
  - name: tcp-blob
    port: 6124
    targetPort: tcp-blob
  selector:
    app: flink
    component: jobmanager
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "jobmanager" ]
        ports:
        - name: http-ui
          containerPort: 8081
        - name: tcp-rpc
          containerPort: 6123
        - name: tcp-blob
          containerPort: 6124
        resources:
          {}
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 7
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "taskmanager" ]
        resources:
          limits:
            cpu: 6000m
            memory: 24Gi
          requests:
            cpu: 6000m
            memory: 24Gi
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-ingress.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: jobmanager
spec:
  rules:
    - host: my.flink.job.manager.url
      http:
        paths:
          - path: /
            backend:
              serviceName: flink-jobmanager
              servicePort: 8081
---

as you can see we are using the skeleton of the standalone configuration as it documented here:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
with some per-company configuration obviously, but still under the scope of this document..

on a normal beautiful day and without the HA configuration, everything works fine.
when trying to configure kubernetes HA using this document:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
with the following parameters:
    kubernetes.cluster-id: flink-cluster
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery
   
the jobmanager fails with the following error:
2021-02-14 16:57:19,103 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: default - flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "default".

so we added this line as well (as you can see in the flink-config configmap above)
kubernetes.namespace: intel360-beta
although it is not part of the document and i don't think flink should be aware of the namespace it resides in, it damages the modularity of upper layers of configurations, regardless we added it and then got the the following error:

2021-02-14 17:00:57,086 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "intel360-beta".

which is bassically the same error message just directed to the flink's namespace.
my question is, do i need to add RBAC to the flink's service account, because i got the impression from the flink official documents and some blogs responses that it designed to function without any special permissions.
if we do need RBAC can you give an official documentations reference of the exact permissions.

NOTE: as you can see our flink-checkpoints and recovery locations are directed to a local directory mounted to a shared NFS between all tasks and job manager, since our infrastructure is bare-metal by design. (although this one is hosted in AWS)

thanks in advance
Omer


---------- Forwarded message ---------
From: Daniel Peled <[hidden email]>
Date: Sun, Feb 14, 2021 at 6:18 PM
Subject: Fwd: Flink’s Kubernetes HA services - NOT working
To: <[hidden email]>




---------- Forwarded message ---------
מאת: Matthias Pohl <[hidden email]>
‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
Subject: Re: Flink’s Kubernetes HA services - NOT working
To: Matthias Pohl <[hidden email]>
Cc: Daniel Peled <[hidden email]>, user <[hidden email]>


One other thing: It looks like you've set high-availability.storageDir to a local path file:///opt/flink/recovery. You should use a storage path that is accessible from all Flink cluster components (e.g. using S3). Only references are stored in Kubernetes ConfigMaps [1].


On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <[hidden email]> wrote:
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 

Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Till Rohrmann
Hi Omar,

I think Matthias is right. The K8s HA services create and edit config maps. Hence they need the rights to do this. In the native K8s documentation there is a section about how to create a service account with the right permissions [1].

I think that our K8s HA documentation currently lacks this part. I will create a PR to update the documentation.


Cheers,
Till 

On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <[hidden email]> wrote:
I'm adding the Flink user ML to the conversation again.

On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <[hidden email]> wrote:
Hi Omer,
thanks for sharing the configuration. You're right: Using NFS for HA's storageDir is fine. 

About the error message you're referring to: I haven't worked with the HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes documentation [1] points out that you can use a custom service account. This one needs special permissions to start/stop pods automatically (which does not apply in your case) but also to access ConfigMaps. You might want to try setting the permission as described in [1].

Best,
Matthias


On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <[hidden email]> wrote:
Hey Matthias.
My name is Omer, i am Daniel's devops, i will elaborate about our flink situation.
these our flink resource definitions, as they are generated using the helm template command (minus log4j,metrics configuration and some sensitive data)
---
# Source: flink/templates/flink-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.execution.failover-strategy: region
    jobmanager.memory.process.size: 8g
    taskmanager.memory.process.size: 24g
    taskmanager.memory.task.off-heap.size: 1g
    taskmanager.numberOfTaskSlots: 4
    queryable-state.proxy.ports: 6125
    queryable-state.enable: true
    blob.server.port: 6124
    parallelism.default: 1
    state.backend.incremental: true
    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/rocksdb
    state.checkpoints.dir: file:///opt/flink/checkpoints
    classloader.resolve-order: child-first
    kubernetes.cluster-id: flink-cluster
    kubernetes.namespace: intel360-beta
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery

---
# Source: flink/templates/flink-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  labels:
    {}
spec:
  ports:
  - name: http-ui
    port: 8081
    targetPort: http-ui
  - name: tcp-rpc
    port: 6123
    targetPort: tcp-rpc
  - name: tcp-blob
    port: 6124
    targetPort: tcp-blob
  selector:
    app: flink
    component: jobmanager
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "jobmanager" ]
        ports:
        - name: http-ui
          containerPort: 8081
        - name: tcp-rpc
          containerPort: 6123
        - name: tcp-blob
          containerPort: 6124
        resources:
          {}
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 7
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "taskmanager" ]
        resources:
          limits:
            cpu: 6000m
            memory: 24Gi
          requests:
            cpu: 6000m
            memory: 24Gi
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-ingress.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: jobmanager
spec:
  rules:
    - host: my.flink.job.manager.url
      http:
        paths:
          - path: /
            backend:
              serviceName: flink-jobmanager
              servicePort: 8081
---

as you can see we are using the skeleton of the standalone configuration as it documented here:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
with some per-company configuration obviously, but still under the scope of this document..

on a normal beautiful day and without the HA configuration, everything works fine.
when trying to configure kubernetes HA using this document:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
with the following parameters:
    kubernetes.cluster-id: flink-cluster
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery
   
the jobmanager fails with the following error:
2021-02-14 16:57:19,103 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: default - flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "default".

so we added this line as well (as you can see in the flink-config configmap above)
kubernetes.namespace: intel360-beta
although it is not part of the document and i don't think flink should be aware of the namespace it resides in, it damages the modularity of upper layers of configurations, regardless we added it and then got the the following error:

2021-02-14 17:00:57,086 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "intel360-beta".

which is bassically the same error message just directed to the flink's namespace.
my question is, do i need to add RBAC to the flink's service account, because i got the impression from the flink official documents and some blogs responses that it designed to function without any special permissions.
if we do need RBAC can you give an official documentations reference of the exact permissions.

NOTE: as you can see our flink-checkpoints and recovery locations are directed to a local directory mounted to a shared NFS between all tasks and job manager, since our infrastructure is bare-metal by design. (although this one is hosted in AWS)

thanks in advance
Omer


---------- Forwarded message ---------
From: Daniel Peled <[hidden email]>
Date: Sun, Feb 14, 2021 at 6:18 PM
Subject: Fwd: Flink’s Kubernetes HA services - NOT working
To: <[hidden email]>




---------- Forwarded message ---------
מאת: Matthias Pohl <[hidden email]>
‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
Subject: Re: Flink’s Kubernetes HA services - NOT working
To: Matthias Pohl <[hidden email]>
Cc: Daniel Peled <[hidden email]>, user <[hidden email]>


One other thing: It looks like you've set high-availability.storageDir to a local path file:///opt/flink/recovery. You should use a storage path that is accessible from all Flink cluster components (e.g. using S3). Only references are stored in Kubernetes ConfigMaps [1].


On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <[hidden email]> wrote:
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 

Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Till Rohrmann
If you are running a session cluster, then Flink will create a config map for every submitted job. These config maps will unfortunately only be cleaned up when you shut down the cluster. This is a known limitation which we want to fix soon [1, 2].

If you can help us with updating the documentation properly (e.g. which role binding to use for the service account with minimal permissions), then we would highly appreciate your help.


Cheers,
Till

On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <[hidden email]> wrote:
Hey guys,
You are right, the documentation lacks this part, and the flink needs it to start.
I'm not sure if it's 100% solved our problem because it creates endless copies of the configmaps with random ids and also our jobs can't schedule for some reason.
I will investigate this further with Daniel and let you know. 
Also the access control given using this document is vast, imprecise and clusterwide (it uses a default edit-all clusterRole), so when you create a PR, make sure that whoever is in charge of  the flink-k8s integration, document the accurate permissions to create and attach to the flink's components.

Thanks very much for your help!
we will keep you updated.
Omer

On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <[hidden email]> wrote:
Hi Omar,

I think Matthias is right. The K8s HA services create and edit config maps. Hence they need the rights to do this. In the native K8s documentation there is a section about how to create a service account with the right permissions [1].

I think that our K8s HA documentation currently lacks this part. I will create a PR to update the documentation.


Cheers,
Till 

On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <[hidden email]> wrote:
I'm adding the Flink user ML to the conversation again.

On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <[hidden email]> wrote:
Hi Omer,
thanks for sharing the configuration. You're right: Using NFS for HA's storageDir is fine. 

About the error message you're referring to: I haven't worked with the HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes documentation [1] points out that you can use a custom service account. This one needs special permissions to start/stop pods automatically (which does not apply in your case) but also to access ConfigMaps. You might want to try setting the permission as described in [1].

Best,
Matthias


On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <[hidden email]> wrote:
Hey Matthias.
My name is Omer, i am Daniel's devops, i will elaborate about our flink situation.
these our flink resource definitions, as they are generated using the helm template command (minus log4j,metrics configuration and some sensitive data)
---
# Source: flink/templates/flink-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.execution.failover-strategy: region
    jobmanager.memory.process.size: 8g
    taskmanager.memory.process.size: 24g
    taskmanager.memory.task.off-heap.size: 1g
    taskmanager.numberOfTaskSlots: 4
    queryable-state.proxy.ports: 6125
    queryable-state.enable: true
    blob.server.port: 6124
    parallelism.default: 1
    state.backend.incremental: true
    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/rocksdb
    state.checkpoints.dir: file:///opt/flink/checkpoints
    classloader.resolve-order: child-first
    kubernetes.cluster-id: flink-cluster
    kubernetes.namespace: intel360-beta
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery

---
# Source: flink/templates/flink-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  labels:
    {}
spec:
  ports:
  - name: http-ui
    port: 8081
    targetPort: http-ui
  - name: tcp-rpc
    port: 6123
    targetPort: tcp-rpc
  - name: tcp-blob
    port: 6124
    targetPort: tcp-blob
  selector:
    app: flink
    component: jobmanager
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "jobmanager" ]
        ports:
        - name: http-ui
          containerPort: 8081
        - name: tcp-rpc
          containerPort: 6123
        - name: tcp-blob
          containerPort: 6124
        resources:
          {}
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 7
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "taskmanager" ]
        resources:
          limits:
            cpu: 6000m
            memory: 24Gi
          requests:
            cpu: 6000m
            memory: 24Gi
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-ingress.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: jobmanager
spec:
  rules:
    - host: my.flink.job.manager.url
      http:
        paths:
          - path: /
            backend:
              serviceName: flink-jobmanager
              servicePort: 8081
---

as you can see we are using the skeleton of the standalone configuration as it documented here:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
with some per-company configuration obviously, but still under the scope of this document..

on a normal beautiful day and without the HA configuration, everything works fine.
when trying to configure kubernetes HA using this document:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
with the following parameters:
    kubernetes.cluster-id: flink-cluster
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery
   
the jobmanager fails with the following error:
2021-02-14 16:57:19,103 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: default - flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "default".

so we added this line as well (as you can see in the flink-config configmap above)
kubernetes.namespace: intel360-beta
although it is not part of the document and i don't think flink should be aware of the namespace it resides in, it damages the modularity of upper layers of configurations, regardless we added it and then got the the following error:

2021-02-14 17:00:57,086 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "intel360-beta".

which is bassically the same error message just directed to the flink's namespace.
my question is, do i need to add RBAC to the flink's service account, because i got the impression from the flink official documents and some blogs responses that it designed to function without any special permissions.
if we do need RBAC can you give an official documentations reference of the exact permissions.

NOTE: as you can see our flink-checkpoints and recovery locations are directed to a local directory mounted to a shared NFS between all tasks and job manager, since our infrastructure is bare-metal by design. (although this one is hosted in AWS)

thanks in advance
Omer


---------- Forwarded message ---------
From: Daniel Peled <[hidden email]>
Date: Sun, Feb 14, 2021 at 6:18 PM
Subject: Fwd: Flink’s Kubernetes HA services - NOT working
To: <[hidden email]>




---------- Forwarded message ---------
מאת: Matthias Pohl <[hidden email]>
‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
Subject: Re: Flink’s Kubernetes HA services - NOT working
To: Matthias Pohl <[hidden email]>
Cc: Daniel Peled <[hidden email]>, user <[hidden email]>


One other thing: It looks like you've set high-availability.storageDir to a local path file:///opt/flink/recovery. You should use a storage path that is accessible from all Flink cluster components (e.g. using S3). Only references are stored in Kubernetes ConfigMaps [1].


On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <[hidden email]> wrote:
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 

Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Till Rohrmann
Hi Omer,

could you share a bit more of the logs with us? I would be interested in what has happened before "Stopping DefaultLeaderRetrievalService" is logged. One problem you might run into is FLINK-20417. This problem should be fixed with Flink 1.12.2.


Cheers,
Till

On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery <[hidden email]> wrote:
Hey guys
It looks like the flink cluster is deployed successfully, it starts with no errors.
but when we try to deploy the jobs, some jobs are starting and some can't find available slots for some reason, even when we have free ones.
happens with different jobs every time..
below are the exceptions thrown by the components.
and I also attached an image showing the taskamangers and the free slots.

jobManager throws this error:
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-02-17 11:19:41,956 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}.
2021-02-17 11:19:41,956 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Registration at ResourceManager was declined: java.lang.Exception: Job leader id service has been stopped.


java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 24 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 24 more
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 317da8a62cf57f74cf587e6bd733f5e7.
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task e1ff02ab7657079c9d2254f12c031b2a_0.
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 13 tasks should be restarted to recover the failed task e1ff02ab7657079c9d2254f12c031b2a_0.

The jobs throws this error
2021-02-17 14:13:48
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(Unknown Source)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
... 24 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
... 25 more

Any suggestions ?

Thanks
Omer

On Tue, Feb 16, 2021 at 6:54 PM Till Rohrmann <[hidden email]> wrote:
If you are running a session cluster, then Flink will create a config map for every submitted job. These config maps will unfortunately only be cleaned up when you shut down the cluster. This is a known limitation which we want to fix soon [1, 2].

If you can help us with updating the documentation properly (e.g. which role binding to use for the service account with minimal permissions), then we would highly appreciate your help.


Cheers,
Till

On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <[hidden email]> wrote:
Hey guys,
You are right, the documentation lacks this part, and the flink needs it to start.
I'm not sure if it's 100% solved our problem because it creates endless copies of the configmaps with random ids and also our jobs can't schedule for some reason.
I will investigate this further with Daniel and let you know. 
Also the access control given using this document is vast, imprecise and clusterwide (it uses a default edit-all clusterRole), so when you create a PR, make sure that whoever is in charge of  the flink-k8s integration, document the accurate permissions to create and attach to the flink's components.

Thanks very much for your help!
we will keep you updated.
Omer

On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <[hidden email]> wrote:
Hi Omar,

I think Matthias is right. The K8s HA services create and edit config maps. Hence they need the rights to do this. In the native K8s documentation there is a section about how to create a service account with the right permissions [1].

I think that our K8s HA documentation currently lacks this part. I will create a PR to update the documentation.


Cheers,
Till 

On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <[hidden email]> wrote:
I'm adding the Flink user ML to the conversation again.

On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <[hidden email]> wrote:
Hi Omer,
thanks for sharing the configuration. You're right: Using NFS for HA's storageDir is fine. 

About the error message you're referring to: I haven't worked with the HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes documentation [1] points out that you can use a custom service account. This one needs special permissions to start/stop pods automatically (which does not apply in your case) but also to access ConfigMaps. You might want to try setting the permission as described in [1].

Best,
Matthias


On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <[hidden email]> wrote:
Hey Matthias.
My name is Omer, i am Daniel's devops, i will elaborate about our flink situation.
these our flink resource definitions, as they are generated using the helm template command (minus log4j,metrics configuration and some sensitive data)
---
# Source: flink/templates/flink-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.execution.failover-strategy: region
    jobmanager.memory.process.size: 8g
    taskmanager.memory.process.size: 24g
    taskmanager.memory.task.off-heap.size: 1g
    taskmanager.numberOfTaskSlots: 4
    queryable-state.proxy.ports: 6125
    queryable-state.enable: true
    blob.server.port: 6124
    parallelism.default: 1
    state.backend.incremental: true
    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/rocksdb
    state.checkpoints.dir: file:///opt/flink/checkpoints
    classloader.resolve-order: child-first
    kubernetes.cluster-id: flink-cluster
    kubernetes.namespace: intel360-beta
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery

---
# Source: flink/templates/flink-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  labels:
    {}
spec:
  ports:
  - name: http-ui
    port: 8081
    targetPort: http-ui
  - name: tcp-rpc
    port: 6123
    targetPort: tcp-rpc
  - name: tcp-blob
    port: 6124
    targetPort: tcp-blob
  selector:
    app: flink
    component: jobmanager
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "jobmanager" ]
        ports:
        - name: http-ui
          containerPort: 8081
        - name: tcp-rpc
          containerPort: 6123
        - name: tcp-blob
          containerPort: 6124
        resources:
          {}
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 7
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "taskmanager" ]
        resources:
          limits:
            cpu: 6000m
            memory: 24Gi
          requests:
            cpu: 6000m
            memory: 24Gi
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-ingress.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: jobmanager
spec:
  rules:
    - host: my.flink.job.manager.url
      http:
        paths:
          - path: /
            backend:
              serviceName: flink-jobmanager
              servicePort: 8081
---

as you can see we are using the skeleton of the standalone configuration as it documented here:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
with some per-company configuration obviously, but still under the scope of this document..

on a normal beautiful day and without the HA configuration, everything works fine.
when trying to configure kubernetes HA using this document:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
with the following parameters:
    kubernetes.cluster-id: flink-cluster
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery
   
the jobmanager fails with the following error:
2021-02-14 16:57:19,103 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: default - flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "default".

so we added this line as well (as you can see in the flink-config configmap above)
kubernetes.namespace: intel360-beta
although it is not part of the document and i don't think flink should be aware of the namespace it resides in, it damages the modularity of upper layers of configurations, regardless we added it and then got the the following error:

2021-02-14 17:00:57,086 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "intel360-beta".

which is bassically the same error message just directed to the flink's namespace.
my question is, do i need to add RBAC to the flink's service account, because i got the impression from the flink official documents and some blogs responses that it designed to function without any special permissions.
if we do need RBAC can you give an official documentations reference of the exact permissions.

NOTE: as you can see our flink-checkpoints and recovery locations are directed to a local directory mounted to a shared NFS between all tasks and job manager, since our infrastructure is bare-metal by design. (although this one is hosted in AWS)

thanks in advance
Omer


---------- Forwarded message ---------
From: Daniel Peled <[hidden email]>
Date: Sun, Feb 14, 2021 at 6:18 PM
Subject: Fwd: Flink’s Kubernetes HA services - NOT working
To: <[hidden email]>




---------- Forwarded message ---------
מאת: Matthias Pohl <[hidden email]>
‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
Subject: Re: Flink’s Kubernetes HA services - NOT working
To: Matthias Pohl <[hidden email]>
Cc: Daniel Peled <[hidden email]>, user <[hidden email]>


One other thing: It looks like you've set high-availability.storageDir to a local path file:///opt/flink/recovery. You should use a storage path that is accessible from all Flink cluster components (e.g. using S3). Only references are stored in Kubernetes ConfigMaps [1].


On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <[hidden email]> wrote:
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 

Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Till Rohrmann
Hi Omer,

thanks for the logs. Could you tell us a bit more about the concrete setup of your Flink K8s cluster? It looks to me as if the ResourceManager cannot talk to the JobMaster which tries to register at the RM. Also some JobMasters don't seem to reach the ResourceManager. Could it be that you are running standby JobManager processes? If this is the case, then it does not work that you are using a K8s service for the communication between Flink components.

Cheers,
Till

On Sun, Feb 28, 2021 at 11:29 AM Omer Ozery <[hidden email]> wrote:
Sorry for the late reply.
I attached to this mail 3 types of logs taken from the jobmanager.

1. flink-jobmanager with log level info - when nothing is working the minute we try to deploy the jobs (even the UI is jobs overview is stuck)
3. flink-jobmanager with log level debug -  when nothing is working the minute we try to deploy the jobs, (even the UI is jobs overview is stuck)
2. flink-jobmanager with log level info with 1 successful job - you can see that it is started and dealing with leadership and checkpoints properly. 

you can see that everything works fine when the cluster is starting with no jobs
all task managers are registered and communicating with the jobmanager with no problems.

* BTY flink has this problem when some jobs are stuck in scheduling, the jobmanager running jobs UI overview is stuck also, and you can't see any jobs running even if there are some. it happened in earlier versions also, 1.9, 1.11...

Thanks
Omer



On Thu, Feb 18, 2021 at 4:22 PM Till Rohrmann <[hidden email]> wrote:
Hi Omer,

could you share a bit more of the logs with us? I would be interested in what has happened before "Stopping DefaultLeaderRetrievalService" is logged. One problem you might run into is FLINK-20417. This problem should be fixed with Flink 1.12.2.


Cheers,
Till

On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery <[hidden email]> wrote:
Hey guys
It looks like the flink cluster is deployed successfully, it starts with no errors.
but when we try to deploy the jobs, some jobs are starting and some can't find available slots for some reason, even when we have free ones.
happens with different jobs every time..
below are the exceptions thrown by the components.
and I also attached an image showing the taskamangers and the free slots.

jobManager throws this error:
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-02-17 11:19:41,956 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}.
2021-02-17 11:19:41,956 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Registration at ResourceManager was declined: java.lang.Exception: Job leader id service has been stopped.


java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 24 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 24 more
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 317da8a62cf57f74cf587e6bd733f5e7.
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task e1ff02ab7657079c9d2254f12c031b2a_0.
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 13 tasks should be restarted to recover the failed task e1ff02ab7657079c9d2254f12c031b2a_0.

The jobs throws this error
2021-02-17 14:13:48
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(Unknown Source)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
... 24 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
... 25 more

Any suggestions ?

Thanks
Omer

On Tue, Feb 16, 2021 at 6:54 PM Till Rohrmann <[hidden email]> wrote:
If you are running a session cluster, then Flink will create a config map for every submitted job. These config maps will unfortunately only be cleaned up when you shut down the cluster. This is a known limitation which we want to fix soon [1, 2].

If you can help us with updating the documentation properly (e.g. which role binding to use for the service account with minimal permissions), then we would highly appreciate your help.


Cheers,
Till

On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <[hidden email]> wrote:
Hey guys,
You are right, the documentation lacks this part, and the flink needs it to start.
I'm not sure if it's 100% solved our problem because it creates endless copies of the configmaps with random ids and also our jobs can't schedule for some reason.
I will investigate this further with Daniel and let you know. 
Also the access control given using this document is vast, imprecise and clusterwide (it uses a default edit-all clusterRole), so when you create a PR, make sure that whoever is in charge of  the flink-k8s integration, document the accurate permissions to create and attach to the flink's components.

Thanks very much for your help!
we will keep you updated.
Omer

On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <[hidden email]> wrote:
Hi Omar,

I think Matthias is right. The K8s HA services create and edit config maps. Hence they need the rights to do this. In the native K8s documentation there is a section about how to create a service account with the right permissions [1].

I think that our K8s HA documentation currently lacks this part. I will create a PR to update the documentation.


Cheers,
Till 

On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <[hidden email]> wrote:
I'm adding the Flink user ML to the conversation again.

On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <[hidden email]> wrote:
Hi Omer,
thanks for sharing the configuration. You're right: Using NFS for HA's storageDir is fine. 

About the error message you're referring to: I haven't worked with the HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes documentation [1] points out that you can use a custom service account. This one needs special permissions to start/stop pods automatically (which does not apply in your case) but also to access ConfigMaps. You might want to try setting the permission as described in [1].

Best,
Matthias


On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <[hidden email]> wrote:
Hey Matthias.
My name is Omer, i am Daniel's devops, i will elaborate about our flink situation.
these our flink resource definitions, as they are generated using the helm template command (minus log4j,metrics configuration and some sensitive data)
---
# Source: flink/templates/flink-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.execution.failover-strategy: region
    jobmanager.memory.process.size: 8g
    taskmanager.memory.process.size: 24g
    taskmanager.memory.task.off-heap.size: 1g
    taskmanager.numberOfTaskSlots: 4
    queryable-state.proxy.ports: 6125
    queryable-state.enable: true
    blob.server.port: 6124
    parallelism.default: 1
    state.backend.incremental: true
    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/rocksdb
    state.checkpoints.dir: file:///opt/flink/checkpoints
    classloader.resolve-order: child-first
    kubernetes.cluster-id: flink-cluster
    kubernetes.namespace: intel360-beta
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery

---
# Source: flink/templates/flink-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  labels:
    {}
spec:
  ports:
  - name: http-ui
    port: 8081
    targetPort: http-ui
  - name: tcp-rpc
    port: 6123
    targetPort: tcp-rpc
  - name: tcp-blob
    port: 6124
    targetPort: tcp-blob
  selector:
    app: flink
    component: jobmanager
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "jobmanager" ]
        ports:
        - name: http-ui
          containerPort: 8081
        - name: tcp-rpc
          containerPort: 6123
        - name: tcp-blob
          containerPort: 6124
        resources:
          {}
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 7
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "taskmanager" ]
        resources:
          limits:
            cpu: 6000m
            memory: 24Gi
          requests:
            cpu: 6000m
            memory: 24Gi
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-ingress.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: jobmanager
spec:
  rules:
    - host: my.flink.job.manager.url
      http:
        paths:
          - path: /
            backend:
              serviceName: flink-jobmanager
              servicePort: 8081
---

as you can see we are using the skeleton of the standalone configuration as it documented here:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
with some per-company configuration obviously, but still under the scope of this document..

on a normal beautiful day and without the HA configuration, everything works fine.
when trying to configure kubernetes HA using this document:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
with the following parameters:
    kubernetes.cluster-id: flink-cluster
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery
   
the jobmanager fails with the following error:
2021-02-14 16:57:19,103 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: default - flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "default".

so we added this line as well (as you can see in the flink-config configmap above)
kubernetes.namespace: intel360-beta
although it is not part of the document and i don't think flink should be aware of the namespace it resides in, it damages the modularity of upper layers of configurations, regardless we added it and then got the the following error:

2021-02-14 17:00:57,086 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "intel360-beta".

which is bassically the same error message just directed to the flink's namespace.
my question is, do i need to add RBAC to the flink's service account, because i got the impression from the flink official documents and some blogs responses that it designed to function without any special permissions.
if we do need RBAC can you give an official documentations reference of the exact permissions.

NOTE: as you can see our flink-checkpoints and recovery locations are directed to a local directory mounted to a shared NFS between all tasks and job manager, since our infrastructure is bare-metal by design. (although this one is hosted in AWS)

thanks in advance
Omer


---------- Forwarded message ---------
From: Daniel Peled <[hidden email]>
Date: Sun, Feb 14, 2021 at 6:18 PM
Subject: Fwd: Flink’s Kubernetes HA services - NOT working
To: <[hidden email]>




---------- Forwarded message ---------
מאת: Matthias Pohl <[hidden email]>
‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
Subject: Re: Flink’s Kubernetes HA services - NOT working
To: Matthias Pohl <[hidden email]>
Cc: Daniel Peled <[hidden email]>, user <[hidden email]>


One other thing: It looks like you've set high-availability.storageDir to a local path file:///opt/flink/recovery. You should use a storage path that is accessible from all Flink cluster components (e.g. using S3). Only references are stored in Kubernetes ConfigMaps [1].


On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <[hidden email]> wrote:
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny 

Reply | Threaded
Open this post in threaded view
|

Re: Flink’s Kubernetes HA services - NOT working

Till Rohrmann
Hmm, this is strange. From the logs it looks as if certain communications between components don't arrive at the receiver's end. I think we have to further dig into the problem.

In order to further narrow it down, could you try to start the cluster with using pod IPs instead of K8s services for inter component communications? You can see here how to configure it [1]. That way we make sure that it is not a problem of the K8s service.


Cheers,
Till

On Mon, Mar 1, 2021, 21:42 Omer Ozery <[hidden email]> wrote:
Hey Till
these our flink resource definitions, as they are generated using the helm template command (minus log4j,metrics configuration and some sensitive data)
---
# Source: flink/templates/flink-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.execution.failover-strategy: region
    jobmanager.memory.process.size: 8g
    taskmanager.memory.process.size: 24g
    taskmanager.memory.task.off-heap.size: 1g
    taskmanager.numberOfTaskSlots: 4
    queryable-state.proxy.ports: 6125
    queryable-state.enable: true
    blob.server.port: 6124
    parallelism.default: 1
    state.backend.incremental: true
    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/rocksdb
    state.checkpoints.dir: file:///opt/flink/checkpoints
    classloader.resolve-order: child-first
    kubernetes.cluster-id: flink-cluster
    kubernetes.namespace: intel360-beta
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery

---
# Source: flink/templates/flink-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  labels:
    {}
spec:
  ports:
  - name: http-ui
    port: 8081
    targetPort: http-ui
  - name: tcp-rpc
    port: 6123
    targetPort: tcp-rpc
  - name: tcp-blob
    port: 6124
    targetPort: tcp-blob
  selector:
    app: flink
    component: jobmanager
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "jobmanager" ]
        ports:
        - name: http-ui
          containerPort: 8081
        - name: tcp-rpc
          containerPort: 6123
        - name: tcp-blob
          containerPort: 6124
        resources:
          {}
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 7
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "taskmanager" ]
        resources:
          limits:
            cpu: 6000m
            memory: 24Gi
          requests:
            cpu: 6000m
            memory: 24Gi
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-ingress.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: jobmanager
spec:
  rules:
    - host: my.flink.job.manager.url
      http:
        paths:
          - path: /
            backend:
              serviceName: flink-jobmanager
              servicePort: 8081
---

as you can see we are using the skeleton of the standalone configuration as it documented here:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
with some per-company configuration obviously, but still under the scope of this document..
And there is no standby jobmanager..

✌️

On Mon, Mar 1, 2021, 12:45 PM Till Rohrmann <[hidden email]> wrote:
Hi Omer,

thanks for the logs. Could you tell us a bit more about the concrete setup of your Flink K8s cluster? It looks to me as if the ResourceManager cannot talk to the JobMaster which tries to register at the RM. Also some JobMasters don't seem to reach the ResourceManager. Could it be that you are running standby JobManager processes? If this is the case, then it does not work that you are using a K8s service for the communication between Flink components.

Cheers,
Till

On Sun, Feb 28, 2021 at 11:29 AM Omer Ozery <[hidden email]> wrote:
Sorry for the late reply.
I attached to this mail 3 types of logs taken from the jobmanager.

1. flink-jobmanager with log level info - when nothing is working the minute we try to deploy the jobs (even the UI is jobs overview is stuck)
3. flink-jobmanager with log level debug -  when nothing is working the minute we try to deploy the jobs, (even the UI is jobs overview is stuck)
2. flink-jobmanager with log level info with 1 successful job - you can see that it is started and dealing with leadership and checkpoints properly. 

you can see that everything works fine when the cluster is starting with no jobs
all task managers are registered and communicating with the jobmanager with no problems.

* BTY flink has this problem when some jobs are stuck in scheduling, the jobmanager running jobs UI overview is stuck also, and you can't see any jobs running even if there are some. it happened in earlier versions also, 1.9, 1.11...

Thanks
Omer



On Thu, Feb 18, 2021 at 4:22 PM Till Rohrmann <[hidden email]> wrote:
Hi Omer,

could you share a bit more of the logs with us? I would be interested in what has happened before "Stopping DefaultLeaderRetrievalService" is logged. One problem you might run into is FLINK-20417. This problem should be fixed with Flink 1.12.2.


Cheers,
Till

On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery <[hidden email]> wrote:
Hey guys
It looks like the flink cluster is deployed successfully, it starts with no errors.
but when we try to deploy the jobs, some jobs are starting and some can't find available slots for some reason, even when we have free ones.
happens with different jobs every time..
below are the exceptions thrown by the components.
and I also attached an image showing the taskamangers and the free slots.

jobManager throws this error:
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-02-17 11:19:41,956 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}.
2021-02-17 11:19:41,956 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] - The watcher is closing.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registration of job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
2021-02-17 11:19:41,956 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Registration at ResourceManager was declined: java.lang.Exception: Job leader id service has been stopped.


java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 24 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 24 more
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 317da8a62cf57f74cf587e6bd733f5e7.
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task e1ff02ab7657079c9d2254f12c031b2a_0.
2021-02-17 11:18:18,977 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 13 tasks should be restarted to recover the failed task e1ff02ab7657079c9d2254f12c031b2a_0.

The jobs throws this error
2021-02-17 14:13:48
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(Unknown Source)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
... 24 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
... 25 more

Any suggestions ?

Thanks
Omer

On Tue, Feb 16, 2021 at 6:54 PM Till Rohrmann <[hidden email]> wrote:
If you are running a session cluster, then Flink will create a config map for every submitted job. These config maps will unfortunately only be cleaned up when you shut down the cluster. This is a known limitation which we want to fix soon [1, 2].

If you can help us with updating the documentation properly (e.g. which role binding to use for the service account with minimal permissions), then we would highly appreciate your help.


Cheers,
Till

On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <[hidden email]> wrote:
Hey guys,
You are right, the documentation lacks this part, and the flink needs it to start.
I'm not sure if it's 100% solved our problem because it creates endless copies of the configmaps with random ids and also our jobs can't schedule for some reason.
I will investigate this further with Daniel and let you know. 
Also the access control given using this document is vast, imprecise and clusterwide (it uses a default edit-all clusterRole), so when you create a PR, make sure that whoever is in charge of  the flink-k8s integration, document the accurate permissions to create and attach to the flink's components.

Thanks very much for your help!
we will keep you updated.
Omer

On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <[hidden email]> wrote:
Hi Omar,

I think Matthias is right. The K8s HA services create and edit config maps. Hence they need the rights to do this. In the native K8s documentation there is a section about how to create a service account with the right permissions [1].

I think that our K8s HA documentation currently lacks this part. I will create a PR to update the documentation.


Cheers,
Till 

On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <[hidden email]> wrote:
I'm adding the Flink user ML to the conversation again.

On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <[hidden email]> wrote:
Hi Omer,
thanks for sharing the configuration. You're right: Using NFS for HA's storageDir is fine. 

About the error message you're referring to: I haven't worked with the HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes documentation [1] points out that you can use a custom service account. This one needs special permissions to start/stop pods automatically (which does not apply in your case) but also to access ConfigMaps. You might want to try setting the permission as described in [1].

Best,
Matthias


On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <[hidden email]> wrote:
Hey Matthias.
My name is Omer, i am Daniel's devops, i will elaborate about our flink situation.
these our flink resource definitions, as they are generated using the helm template command (minus log4j,metrics configuration and some sensitive data)
---
# Source: flink/templates/flink-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.execution.failover-strategy: region
    jobmanager.memory.process.size: 8g
    taskmanager.memory.process.size: 24g
    taskmanager.memory.task.off-heap.size: 1g
    taskmanager.numberOfTaskSlots: 4
    queryable-state.proxy.ports: 6125
    queryable-state.enable: true
    blob.server.port: 6124
    parallelism.default: 1
    state.backend.incremental: true
    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/rocksdb
    state.checkpoints.dir: file:///opt/flink/checkpoints
    classloader.resolve-order: child-first
    kubernetes.cluster-id: flink-cluster
    kubernetes.namespace: intel360-beta
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery

---
# Source: flink/templates/flink-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  labels:
    {}
spec:
  ports:
  - name: http-ui
    port: 8081
    targetPort: http-ui
  - name: tcp-rpc
    port: 6123
    targetPort: tcp-rpc
  - name: tcp-blob
    port: 6124
    targetPort: tcp-blob
  selector:
    app: flink
    component: jobmanager
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "jobmanager" ]
        ports:
        - name: http-ui
          containerPort: 8081
        - name: tcp-rpc
          containerPort: 6123
        - name: tcp-blob
          containerPort: 6124
        resources:
          {}
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 7
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
      annotations:
        checksum/config: f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1-scala_2.11-java11
        args: [ "taskmanager" ]
        resources:
          limits:
            cpu: 6000m
            memory: 24Gi
          requests:
            cpu: 6000m
            memory: 24Gi
        # Environment Variables
        env:
        - name: ENABLE_CHECKPOINTING
          value: "true"
        - name: JOB_MANAGER_RPC_ADDRESS
          value: "flink-jobmanager"
        volumeMounts:
        - name: flink-config
          mountPath: /opt/flink/conf/flink-conf.yaml
          subPath: flink-conf.yaml
        # NFS mounts
        - name: flink-checkpoints
          mountPath: "/opt/flink/checkpoints"
        - name: flink-recovery
          mountPath: "/opt/flink/recovery"
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      # NFS volumes
      - name: flink-checkpoints
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/checkpoints"
      - name: flink-recovery
        nfs:
          server: "my-nfs-server.my-org"
          path: "/my-shared-nfs-dir/flink/recovery"
---
# Source: flink/templates/flink-ingress.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: jobmanager
spec:
  rules:
    - host: my.flink.job.manager.url
      http:
        paths:
          - path: /
            backend:
              serviceName: flink-jobmanager
              servicePort: 8081
---

as you can see we are using the skeleton of the standalone configuration as it documented here:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
with some per-company configuration obviously, but still under the scope of this document..

on a normal beautiful day and without the HA configuration, everything works fine.
when trying to configure kubernetes HA using this document:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
with the following parameters:
    kubernetes.cluster-id: flink-cluster
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///opt/flink/recovery
   
the jobmanager fails with the following error:
2021-02-14 16:57:19,103 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: default - flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "default".

so we added this line as well (as you can see in the flink-config configmap above)
kubernetes.namespace: intel360-beta
although it is not part of the document and i don't think flink should be aware of the namespace it resides in, it damages the modularity of upper layers of configurations, regardless we added it and then got the the following error:

2021-02-14 17:00:57,086 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-cluster-restserver-leader" is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get resource "configmaps" in API group "" in the namespace "intel360-beta".

which is bassically the same error message just directed to the flink's namespace.
my question is, do i need to add RBAC to the flink's service account, because i got the impression from the flink official documents and some blogs responses that it designed to function without any special permissions.
if we do need RBAC can you give an official documentations reference of the exact permissions.

NOTE: as you can see our flink-checkpoints and recovery locations are directed to a local directory mounted to a shared NFS between all tasks and job manager, since our infrastructure is bare-metal by design. (although this one is hosted in AWS)

thanks in advance
Omer


---------- Forwarded message ---------
From: Daniel Peled <[hidden email]>
Date: Sun, Feb 14, 2021 at 6:18 PM
Subject: Fwd: Flink’s Kubernetes HA services - NOT working
To: <[hidden email]>




---------- Forwarded message ---------
מאת: Matthias Pohl <[hidden email]>
‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
Subject: Re: Flink’s Kubernetes HA services - NOT working
To: Matthias Pohl <[hidden email]>
Cc: Daniel Peled <[hidden email]>, user <[hidden email]>


One other thing: It looks like you've set high-availability.storageDir to a local path file:///opt/flink/recovery. You should use a storage path that is accessible from all Flink cluster components (e.g. using S3). Only references are stored in Kubernetes ConfigMaps [1].


On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <[hidden email]> wrote:
Hi Daniel,
what's the exact configuration you used? Did you use the resource definitions provided in the Standalone Flink on Kubernetes docs [1]? Did you do certain things differently in comparison to the documentation?

Best,
Matthias


On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <[hidden email]> wrote:

,Hey

We are using standalone flink on kubernetes
:"And we have followed the instructions in the following link "Kubernetes HA Services
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
.We were unable to make it work
.We are facing a lot of problems
For example some of the jobs don't start complaining that there are not enough slots available - although there are enough slots  and it seems as the job manager is NOT aware of all the task managers
.In other scenario we were unable to run any job at all
 The flink dashboard is unresponsive and we get the error
"flink service temporarily unavailable due to an ongoing leader election. please refresh"
.We believe we are missing some configurations
 ?Are there any more detailed instructions
?And suggestions/tips
 .Attached is the log of the job manager in one of the attempts

Please give me some advice.
BR,
Danny