TaskManagers cannot contact JobManager in Kubernetes when JobManager HA is enabled

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

TaskManagers cannot contact JobManager in Kubernetes when JobManager HA is enabled

John Stone
I have successfully managed to deploy a Flink cluster in Kubernetes without JobManager high availability.  Everything works great.  The moment I enable high availability, TaskManagers fail to contact the JobManager.  My configurations and logs are below.  Can someone point me in the correct direction?  Many thanks!

Note the following:
- Cluster is running Flink 1.6.2 .
- Logs for the JobManager and TaskManagers indicate that they can successfully connect to the Zookeeper instances.
- The Zookeeper instances reside as standard EC2 instances while the Flink cluster is fully contained in Kubernetes.  Everything is within the same VPC.
- I have tried a TaskManager configuration which does not contain "high-availability: zookeeper" (i.e. launched the JM under one configuration and the TMs under another such that this is the only difference in the flink-conf.yaml file).  This did not help.

============================================================================

TaskManager Log Snippet

2018-10-31 23:14:09,035 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2018-10-31 23:14:09,035 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@flink-jobmanager:43982] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-jobmanager:43982]] Caused by: [No response from remote for outbound association. Associate timed out after [20000 ms].]
2018-10-31 23:14:09,128 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: flink-jobmanager/100.70.34.197:43982
2018-10-31 23:14:19,135 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:43982/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:43982/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2018-10-31 23:14:28,065 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor            - Fatal error occurred in TaskExecutor akka.tcp://flink@flink-taskmanager-75f746bdf7-fpw9h:38130/user/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now.

============================================================================

Configurations

JobManager Deployment

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.6.2-hadoop28-scala_2.11
        command: ["/bin/sh", "-c", "cp /opt/flink/opt/flink-s3-fs-hadoop-1.6.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.6.2.jar && /docker-entrypoint.sh jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 6125
          name: query
        - containerPort: 8081
          name: ui
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager
        - name: FLINK_CONF_DIR
          value: /etc/flink
        volumeMounts:
        - name: flink-config
          mountPath: /etc/flink
        - name: flink-hadoop-config
          mountPath: /etc/hadoop/conf
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      - name: flink-hadoop-config
        configMap:
          name: flink-hadoop-config

-----------------------------------------------------------------------------

JobManager Service

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: query
    port: 6125
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

-----------------------------------------------------------------------------

TaskManager Deployment

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.6.2-hadoop28-scala_2.11
        command: ["/bin/sh", "-c", "cp /opt/flink/opt/flink-s3-fs-hadoop-1.6.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.6.2.jar && /docker-entrypoint.sh taskmanager"]
        ports:
        - containerPort: 6121
          name: data
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager
        - name: FLINK_CONF_DIR
          value: /etc/flink
        resources:
          limits:
            memory: "2Gi"
          requests:
            memory: "2Gi"
        volumeMounts:
        - name: flink-config
          mountPath: /etc/flink
        - name: flink-hadoop-config
          mountPath: /etc/hadoop/conf
      volumes:
      - name: flink-config
        configMap:
          name: flink-config
      - name: flink-hadoop-config
        configMap:
          name: flink-hadoop-config

-----------------------------------------------

Hadoop ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-hadoop-config
  labels:
    app: flink
data:
  core-site.xml: |
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
      <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
      </property>
      <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
      </property>
    </configuration>

------------------------------------------------

Flink ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |
    blob.server.port: 6124
    high-availability: zookeeper
    high-availability.zookeeper.quorum: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
    high-availability.storageDir: s3a://flink-test-bucket/flink-zk-storage
    high-availability.zookeeper.path.root: /flink-test
    high-availability.cluster-id: /flink-test
    jobmanager.heap.size: 1024m
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    parallelism.default: 1
    query.server.port: 6125
    rest.port: 8081
    state.backend.incremental: true
    state.backend: rocksdb
    state.checkpoints.dir: s3a://flink-test-bucket/flink-checkpoints
    taskmanager.heap.size: 1024m
    taskmanager.numberOfTaskSlots: 4
  log4j-console.properties: |
    log4j.rootLogger=INFO, console, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Reply | Threaded
Open this post in threaded view
|

Re: TaskManagers cannot contact JobManager in Kubernetes when JobManager HA is enabled

John Stone
I've managed to resolve the issue.  With HA enabled, you will see this message in the logs:

2018-11-01 13:38:52,467 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@flink-jobmanager:40641

Without HA enabled, you will see this message in the logs:

2018-11-01 13:38:52,467 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@flink-jobmanager:6123

HA causes a random port assignment for the ResourceManager portion of the JobManager.  This can be controlled by setting the high-availability.jobmanager.port to a fixed port and exposing it in the Kubernetes network configuration.
Reply | Threaded
Open this post in threaded view
|

Re: TaskManagers cannot contact JobManager in Kubernetes when JobManager HA is enabled

Dawid Wysakowicz-2
Hi John,

Glad you resolved the issue. Also thanks for sharing the solution with ML!

Best,

Dawid

On 01/11/2018 16:22, John Stone wrote:
> I've managed to resolve the issue.  With HA enabled, you will see this message in the logs:
>
> 2018-11-01 13:38:52,467 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@flink-jobmanager:40641
>
> Without HA enabled, you will see this message in the logs:
>
> 2018-11-01 13:38:52,467 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@flink-jobmanager:6123
>
> HA causes a random port assignment for the ResourceManager portion of the JobManager.  This can be controlled by setting the high-availability.jobmanager.port to a fixed port and exposing it in the Kubernetes network configuration.


signature.asc (849 bytes) Download Attachment