Running Pyflink job on K8s Flink Cluster Deployment?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Running Pyflink job on K8s Flink Cluster Deployment?

Kevin Lam
Hello everyone,

I'm looking to run a Pyflink application run in a distributed fashion, using kubernetes, and am currently facing issues. I've successfully gotten a Scala Flink Application to run using the manifests provided at [0]

I attempted to run the application by updating the jobmanager command args from

 args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] 
to 

args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] 
But this didn't work. It resulted in the following error:

Caused by: java.lang.LinkageError: loader constraint violation: loader org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class org.apache.commons.cli.Options. A different class with the same name was previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed module of loader 'app'

I was able to get things to 'run' by setting args to:

args: ["python", "my_python_app.py", <optional arguments>, <job arguments>] 

But I'm not sure if things were running in a distributed fashion or not. 

1/ Is there a good way to check if the task pods were being correctly utilized?

2/ Are there any similar examples to [0] for how to run Pyflink jobs on kubernetes? 

Open to any suggestions you may have. Note: we'd prefer not to run using the native K8S route outlined at [1] because we need to maintain the ability to customize certain aspects of the deployment (eg. mounting SSDs to some of the pods)

Thanks in advance!

[0] 


Reply | Threaded
Open this post in threaded view
|

Re: Running Pyflink job on K8s Flink Cluster Deployment?

Shuiqiang Chen
Hi Kevin, 

You are able to run PyFlink applications on kuberetes cluster, both native k8s mode and resource definition mode are supported since release-1.12.0. Currently, Python and PyFlink are not enabled in official flink docker image, that you might need to build a custom image with Python and PyFlink install, please refer to Enbale Python in docker.

Generally, by setting the value of args field in `jobmanager-application.yaml` to be args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] the job manager will try to submit a PyFlink job with the specified python file once it is started. You can check the pod status for jobmanger and taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will turn to the completed state once the job is finished or error state if there is something wrong, while the task manger pod will always be in the running state.  

Finally, it requires you to tear down the cluster by deleting all created resources (jobmanger/taskmanger jobs, flink-conf configmap, jobmanger-service, etc).

Best,
Shuiqiang 



Kevin Lam <[hidden email]> 于2021年3月6日周六 上午5:29写道:
Hello everyone,

I'm looking to run a Pyflink application run in a distributed fashion, using kubernetes, and am currently facing issues. I've successfully gotten a Scala Flink Application to run using the manifests provided at [0]

I attempted to run the application by updating the jobmanager command args from

 args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] 
to 

args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] 
But this didn't work. It resulted in the following error:

Caused by: java.lang.LinkageError: loader constraint violation: loader org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class org.apache.commons.cli.Options. A different class with the same name was previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed module of loader 'app'

I was able to get things to 'run' by setting args to:

args: ["python", "my_python_app.py", <optional arguments>, <job arguments>] 

But I'm not sure if things were running in a distributed fashion or not. 

1/ Is there a good way to check if the task pods were being correctly utilized?

2/ Are there any similar examples to [0] for how to run Pyflink jobs on kubernetes? 

Open to any suggestions you may have. Note: we'd prefer not to run using the native K8S route outlined at [1] because we need to maintain the ability to customize certain aspects of the deployment (eg. mounting SSDs to some of the pods)

Thanks in advance!

[0] 


Reply | Threaded
Open this post in threaded view
|

Re: Running Pyflink job on K8s Flink Cluster Deployment?

Shuiqiang Chen
Hi Kevin,

For your information, bellow is an example for running a PyFlink table API WordCount job.

1. Building a Docker image with Python and PyFlink Installed:

Dockerfile:

FROM flink:1.12.0


# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install Python Flink

RUN pip3 install apache-flink==1.12.0

2. Resource definitions:

Flink-configuration-configmap.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

Job-manager-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Job-manager.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: pyflink:v1
          env:
          args: ["standalone-job", "-py", "/opt/flink/examples/python/table/batch/word_count.py"] 
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

Task-manager.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: pyflink:v1
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

3. Creating resources:

$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
# Create the deployments for the cluster
$ kubectl create -f job-manager.yaml
$ kubectl create -f task-manager.yaml
Best,
Shuiqiang

Shuiqiang Chen <[hidden email]> 于2021年3月6日周六 下午5:10写道:
Hi Kevin, 

You are able to run PyFlink applications on kuberetes cluster, both native k8s mode and resource definition mode are supported since release-1.12.0. Currently, Python and PyFlink are not enabled in official flink docker image, that you might need to build a custom image with Python and PyFlink install, please refer to Enbale Python in docker.

Generally, by setting the value of args field in `jobmanager-application.yaml` to be args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] the job manager will try to submit a PyFlink job with the specified python file once it is started. You can check the pod status for jobmanger and taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will turn to the completed state once the job is finished or error state if there is something wrong, while the task manger pod will always be in the running state.  

Finally, it requires you to tear down the cluster by deleting all created resources (jobmanger/taskmanger jobs, flink-conf configmap, jobmanger-service, etc).

Best,
Shuiqiang 



Kevin Lam <[hidden email]> 于2021年3月6日周六 上午5:29写道:
Hello everyone,

I'm looking to run a Pyflink application run in a distributed fashion, using kubernetes, and am currently facing issues. I've successfully gotten a Scala Flink Application to run using the manifests provided at [0]

I attempted to run the application by updating the jobmanager command args from

 args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] 
to 

args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] 
But this didn't work. It resulted in the following error:

Caused by: java.lang.LinkageError: loader constraint violation: loader org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class org.apache.commons.cli.Options. A different class with the same name was previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed module of loader 'app'

I was able to get things to 'run' by setting args to:

args: ["python", "my_python_app.py", <optional arguments>, <job arguments>] 

But I'm not sure if things were running in a distributed fashion or not. 

1/ Is there a good way to check if the task pods were being correctly utilized?

2/ Are there any similar examples to [0] for how to run Pyflink jobs on kubernetes? 

Open to any suggestions you may have. Note: we'd prefer not to run using the native K8S route outlined at [1] because we need to maintain the ability to customize certain aspects of the deployment (eg. mounting SSDs to some of the pods)

Thanks in advance!

[0] 


Reply | Threaded
Open this post in threaded view
|

Re: Running Pyflink job on K8s Flink Cluster Deployment?

Kevin Lam
Awesome, thanks Shuiqiang! I was able to get an example running by referencing your configs.

On Sat, Mar 6, 2021 at 7:12 AM Shuiqiang Chen <[hidden email]> wrote:
Hi Kevin,

For your information, bellow is an example for running a PyFlink table API WordCount job.

1. Building a Docker image with Python and PyFlink Installed:

Dockerfile:

FROM flink:1.12.0


# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install Python Flink

RUN pip3 install apache-flink==1.12.0

2. Resource definitions:

Flink-configuration-configmap.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

Job-manager-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Job-manager.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: pyflink:v1
          env:
          args: ["standalone-job", "-py", "/opt/flink/examples/python/table/batch/word_count.py"] 
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

Task-manager.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: pyflink:v1
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

3. Creating resources:

$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
# Create the deployments for the cluster
$ kubectl create -f job-manager.yaml
$ kubectl create -f task-manager.yaml
Best,
Shuiqiang

Shuiqiang Chen <[hidden email]> 于2021年3月6日周六 下午5:10写道:
Hi Kevin, 

You are able to run PyFlink applications on kuberetes cluster, both native k8s mode and resource definition mode are supported since release-1.12.0. Currently, Python and PyFlink are not enabled in official flink docker image, that you might need to build a custom image with Python and PyFlink install, please refer to Enbale Python in docker.

Generally, by setting the value of args field in `jobmanager-application.yaml` to be args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] the job manager will try to submit a PyFlink job with the specified python file once it is started. You can check the pod status for jobmanger and taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will turn to the completed state once the job is finished or error state if there is something wrong, while the task manger pod will always be in the running state.  

Finally, it requires you to tear down the cluster by deleting all created resources (jobmanger/taskmanger jobs, flink-conf configmap, jobmanger-service, etc).

Best,
Shuiqiang 



Kevin Lam <[hidden email]> 于2021年3月6日周六 上午5:29写道:
Hello everyone,

I'm looking to run a Pyflink application run in a distributed fashion, using kubernetes, and am currently facing issues. I've successfully gotten a Scala Flink Application to run using the manifests provided at [0]

I attempted to run the application by updating the jobmanager command args from

 args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] 
to 

args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>] 
But this didn't work. It resulted in the following error:

Caused by: java.lang.LinkageError: loader constraint violation: loader org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class org.apache.commons.cli.Options. A different class with the same name was previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed module of loader 'app'

I was able to get things to 'run' by setting args to:

args: ["python", "my_python_app.py", <optional arguments>, <job arguments>] 

But I'm not sure if things were running in a distributed fashion or not. 

1/ Is there a good way to check if the task pods were being correctly utilized?

2/ Are there any similar examples to [0] for how to run Pyflink jobs on kubernetes? 

Open to any suggestions you may have. Note: we'd prefer not to run using the native K8S route outlined at [1] because we need to maintain the ability to customize certain aspects of the deployment (eg. mounting SSDs to some of the pods)

Thanks in advance!

[0]