I have successfully managed to deploy a Flink cluster in Kubernetes without JobManager high availability. Everything works great. The moment I enable high availability, TaskManagers fail to contact the JobManager. My configurations and logs are below. Can someone point me in the correct direction? Many thanks!
Note the following: - Cluster is running Flink 1.6.2 . - Logs for the JobManager and TaskManagers indicate that they can successfully connect to the Zookeeper instances. - The Zookeeper instances reside as standard EC2 instances while the Flink cluster is fully contained in Kubernetes. Everything is within the same VPC. - I have tried a TaskManager configuration which does not contain "high-availability: zookeeper" (i.e. launched the JM under one configuration and the TMs under another such that this is the only difference in the flink-conf.yaml file). This did not help. ============================================================================ TaskManager Log Snippet 2018-10-31 23:14:09,035 INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [20000 ms]. 2018-10-31 23:14:09,035 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@flink-jobmanager:43982] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-jobmanager:43982]] Caused by: [No response from remote for outbound association. Associate timed out after [20000 ms].] 2018-10-31 23:14:09,128 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: flink-jobmanager/100.70.34.197:43982 2018-10-31 23:14:19,135 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:43982/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:43982/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify".. 2018-10-31 23:14:28,065 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Fatal error occurred in TaskExecutor akka.tcp://flink@flink-taskmanager-75f746bdf7-fpw9h:38130/user/taskmanager_0. org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now. ============================================================================ Configurations JobManager Deployment apiVersion: extensions/v1beta1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: flink:1.6.2-hadoop28-scala_2.11 command: ["/bin/sh", "-c", "cp /opt/flink/opt/flink-s3-fs-hadoop-1.6.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.6.2.jar && /docker-entrypoint.sh jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob - containerPort: 6125 name: query - containerPort: 8081 name: ui env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-jobmanager - name: FLINK_CONF_DIR value: /etc/flink volumeMounts: - name: flink-config mountPath: /etc/flink - name: flink-hadoop-config mountPath: /etc/hadoop/conf volumes: - name: flink-config configMap: name: flink-config - name: flink-hadoop-config configMap: name: flink-hadoop-config ----------------------------------------------------------------------------- JobManager Service apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: ports: - name: rpc port: 6123 - name: blob port: 6124 - name: query port: 6125 - name: ui port: 8081 selector: app: flink component: jobmanager ----------------------------------------------------------------------------- TaskManager Deployment apiVersion: extensions/v1beta1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: flink:1.6.2-hadoop28-scala_2.11 command: ["/bin/sh", "-c", "cp /opt/flink/opt/flink-s3-fs-hadoop-1.6.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.6.2.jar && /docker-entrypoint.sh taskmanager"] ports: - containerPort: 6121 name: data - containerPort: 6122 name: rpc - containerPort: 6125 name: query env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-jobmanager - name: FLINK_CONF_DIR value: /etc/flink resources: limits: memory: "2Gi" requests: memory: "2Gi" volumeMounts: - name: flink-config mountPath: /etc/flink - name: flink-hadoop-config mountPath: /etc/hadoop/conf volumes: - name: flink-config configMap: name: flink-config - name: flink-hadoop-config configMap: name: flink-hadoop-config ----------------------------------------------- Hadoop ConfigMap apiVersion: v1 kind: ConfigMap metadata: name: flink-hadoop-config labels: app: flink data: core-site.xml: | <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> </configuration> ------------------------------------------------ Flink ConfigMap apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: | blob.server.port: 6124 high-availability: zookeeper high-availability.zookeeper.quorum: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 high-availability.storageDir: s3a://flink-test-bucket/flink-zk-storage high-availability.zookeeper.path.root: /flink-test high-availability.cluster-id: /flink-test jobmanager.heap.size: 1024m jobmanager.rpc.address: flink-jobmanager jobmanager.rpc.port: 6123 parallelism.default: 1 query.server.port: 6125 rest.port: 8081 state.backend.incremental: true state.backend: rocksdb state.checkpoints.dir: s3a://flink-test-bucket/flink-checkpoints taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 4 log4j-console.properties: | log4j.rootLogger=INFO, console, file log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n |
I've managed to resolve the issue. With HA enabled, you will see this message in the logs:
2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:40641 Without HA enabled, you will see this message in the logs: 2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:6123 HA causes a random port assignment for the ResourceManager portion of the JobManager. This can be controlled by setting the high-availability.jobmanager.port to a fixed port and exposing it in the Kubernetes network configuration. |
Hi John,
Glad you resolved the issue. Also thanks for sharing the solution with ML! Best, Dawid On 01/11/2018 16:22, John Stone wrote: > I've managed to resolve the issue. With HA enabled, you will see this message in the logs: > > 2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:40641 > > Without HA enabled, you will see this message in the logs: > > 2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:6123 > > HA causes a random port assignment for the ResourceManager portion of the JobManager. This can be controlled by setting the high-availability.jobmanager.port to a fixed port and exposing it in the Kubernetes network configuration. signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |