Hello everyone,
I'm looking to run a Pyflink application run in a distributed fashion, using kubernetes, and am currently facing issues. I've successfully gotten a Scala Flink Application to run using the manifests provided at [0] I attempted to run the application by updating the jobmanager command args from
to
But this didn't work. It resulted in the following error: Caused by: java.lang.LinkageError: loader constraint violation: loader org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class org.apache.commons.cli.Options. A different class with the same name was previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed module of loader 'app' I was able to get things to 'run' by setting args to:
But I'm not sure if things were running in a distributed fashion or not. 1/ Is there a good way to check if the task pods were being correctly utilized? 2/ Are there any similar examples to [0] for how to run Pyflink jobs on kubernetes? Open to any suggestions you may have. Note: we'd prefer not to run using the native K8S route outlined at [1] because we need to maintain the ability to customize certain aspects of the deployment (eg. mounting SSDs to some of the pods) Thanks in advance! [0] |
Hi Kevin, You are able to run PyFlink applications on kuberetes cluster, both native k8s mode and resource definition mode are supported since release-1.12.0. Currently, Python and PyFlink are not enabled in official flink docker image, that you might need to build a custom image with Python and PyFlink install, please refer to Enbale Python in docker. Generally, by setting the value of args field in `jobmanager-application.yaml` to be args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] the job manager will try to submit a PyFlink job with the specified python file once it is started. You can check the pod status for jobmanger and taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will turn to the completed state once the job is finished or error state if there is something wrong, while the task manger pod will always be in the running state. Finally, it requires you to tear down the cluster by deleting all created resources (jobmanger/taskmanger jobs, flink-conf configmap, jobmanger-service, etc). Best, Shuiqiang Kevin Lam <[hidden email]> 于2021年3月6日周六 上午5:29写道:
|
Hi Kevin, For your information, bellow is an example for running a PyFlink table API WordCount job. 1. Building a Docker image with Python and PyFlink Installed: Dockerfile: FROM flink:1.12.0 # install python3 and pip3 RUN apt-get update -y && \ apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/* RUN ln -s /usr/bin/python3 /usr/bin/python # install Python Flink RUN pip3 install apache-flink==1.12.0 2. Resource definitions: Flink-configuration-configmap.yaml: apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF Job-manager-service.yaml: apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager Job-manager.yaml apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: pyflink:v1 env: args: ["standalone-job", "-py", "/opt/flink/examples/python/table/batch/word_count.py"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties Task-manager.yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: pyflink:v1 env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties 3. Creating resources: Best,Shuiqiang Shuiqiang Chen <[hidden email]> 于2021年3月6日周六 下午5:10写道:
|
Awesome, thanks Shuiqiang! I was able to get an example running by referencing your configs. On Sat, Mar 6, 2021 at 7:12 AM Shuiqiang Chen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |