Native kubernetes execution and History server

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Native kubernetes execution and History server

Lukáš Drbal
Hi,

I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: super-flink-batch-job
        spec:
          containers:
          - name: runner
            image: localhost:5000/batch-flink-app-v3:latest
            imagePullPolicy: Always
            command:
              - /bin/sh
              - -c
              - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar
          restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8)

Anytime it creates JobId=00000000000000000000000000000000 which obviously leads to

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282]
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

Reply | Threaded
Open this post in threaded view
|

Re: Native kubernetes execution and History server

Guowei Ma
Hi,
After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <[hidden email]> wrote:
Hi,

I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: super-flink-batch-job
        spec:
          containers:
          - name: runner
            image: localhost:5000/batch-flink-app-v3:latest
            imagePullPolicy: Always
            command:
              - /bin/sh
              - -c
              - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar
          restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8)

Anytime it creates JobId=00000000000000000000000000000000 which obviously leads to

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282]
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

Reply | Threaded
Open this post in threaded view
|

Re: Native kubernetes execution and History server

Lukáš Drbal
Hello,

sure. Here is log from first run which succeed - https://pastebin.com/tV75ZS5S
and here is from second run (it's same for all next) - https://pastebin.com/pwTFyGvE

My Docker file is pretty simple, just take wordcount + S3

FROM flink:1.12.2

RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-examples-batch_2.12-1.12.2-WordCount.jar  $FLINK_HOME/usrlib/wordcount.jar

RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/


Thanks!

On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <[hidden email]> wrote:
Hi,
After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <[hidden email]> wrote:
Hi,

I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: super-flink-batch-job
        spec:
          containers:
          - name: runner
            image: localhost:5000/batch-flink-app-v3:latest
            imagePullPolicy: Always
            command:
              - /bin/sh
              - -c
              - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar
          restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8)

Anytime it creates JobId=00000000000000000000000000000000 which obviously leads to

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282]
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

Reply | Threaded
Open this post in threaded view
|

Re: Native kubernetes execution and History server

Guowei Ma
Hi, 
Thanks for providing the logs. From the logs this is a known bug.[1] 
Maybe you could use `$internal.pipeline.job-id` to set your own job-id.(Thanks to Wang Yang) 
But keep in mind this is only for internal use and may be changed in some release. So you should keep an eye on [1] for the correct solution.


Best,
Guowei


On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal <[hidden email]> wrote:
Hello,

sure. Here is log from first run which succeed - https://pastebin.com/tV75ZS5S
and here is from second run (it's same for all next) - https://pastebin.com/pwTFyGvE

My Docker file is pretty simple, just take wordcount + S3

FROM flink:1.12.2

RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-examples-batch_2.12-1.12.2-WordCount.jar  $FLINK_HOME/usrlib/wordcount.jar

RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/


Thanks!

On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <[hidden email]> wrote:
Hi,
After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <[hidden email]> wrote:
Hi,

I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: super-flink-batch-job
        spec:
          containers:
          - name: runner
            image: localhost:5000/batch-flink-app-v3:latest
            imagePullPolicy: Always
            command:
              - /bin/sh
              - -c
              - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar
          restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8)

Anytime it creates JobId=00000000000000000000000000000000 which obviously leads to

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282]
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

Reply | Threaded
Open this post in threaded view
|

Re: Native kubernetes execution and History server

Lukáš Drbal
Hello Guowei,

I just checked it and it works!

Thanks a lot!

Here is workaround which use UUID as jobId:  -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")


L.

On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma <[hidden email]> wrote:
Hi, 
Thanks for providing the logs. From the logs this is a known bug.[1] 
Maybe you could use `$internal.pipeline.job-id` to set your own job-id.(Thanks to Wang Yang) 
But keep in mind this is only for internal use and may be changed in some release. So you should keep an eye on [1] for the correct solution.


Best,
Guowei


On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal <[hidden email]> wrote:
Hello,

sure. Here is log from first run which succeed - https://pastebin.com/tV75ZS5S
and here is from second run (it's same for all next) - https://pastebin.com/pwTFyGvE

My Docker file is pretty simple, just take wordcount + S3

FROM flink:1.12.2

RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-examples-batch_2.12-1.12.2-WordCount.jar  $FLINK_HOME/usrlib/wordcount.jar

RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/


Thanks!

On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <[hidden email]> wrote:
Hi,
After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <[hidden email]> wrote:
Hi,

I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: super-flink-batch-job
        spec:
          containers:
          - name: runner
            image: localhost:5000/batch-flink-app-v3:latest
            imagePullPolicy: Always
            command:
              - /bin/sh
              - -c
              - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar
          restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8)

Anytime it creates JobId=00000000000000000000000000000000 which obviously leads to

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282]
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

Reply | Threaded
Open this post in threaded view
|

Re: Native kubernetes execution and History server

Yang Wang
Thanks Guowei for the comments and Lukáš Drbal for sharing the feedback.

I think it is not only for Kubernetes application mode, but also Yarn application and standalone application,
the job id will be set to ZERO if not configured explicitly in HA mode.

For standalone application, we could use "--job-id" to specify the user defined job id.

However, for Yarn and Kubernetes applications, we do not have a public config options for this.
Concerning we might support multiple jobs in a single Flink application when HA enabled in the future,
introducing such a public config option may be inopportune.


Best,
Yang

Lukáš Drbal <[hidden email]> 于2021年3月25日周四 下午7:09写道:
Hello Guowei,

I just checked it and it works!

Thanks a lot!

Here is workaround which use UUID as jobId:  -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")


L.

On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma <[hidden email]> wrote:
Hi, 
Thanks for providing the logs. From the logs this is a known bug.[1] 
Maybe you could use `$internal.pipeline.job-id` to set your own job-id.(Thanks to Wang Yang) 
But keep in mind this is only for internal use and may be changed in some release. So you should keep an eye on [1] for the correct solution.


Best,
Guowei


On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal <[hidden email]> wrote:
Hello,

sure. Here is log from first run which succeed - https://pastebin.com/tV75ZS5S
and here is from second run (it's same for all next) - https://pastebin.com/pwTFyGvE

My Docker file is pretty simple, just take wordcount + S3

FROM flink:1.12.2

RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-examples-batch_2.12-1.12.2-WordCount.jar  $FLINK_HOME/usrlib/wordcount.jar

RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/


Thanks!

On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <[hidden email]> wrote:
Hi,
After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <[hidden email]> wrote:
Hi,

I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            app: super-flink-batch-job
        spec:
          containers:
          - name: runner
            image: localhost:5000/batch-flink-app-v3:latest
            imagePullPolicy: Always
            command:
              - /bin/sh
              - -c
              - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar
          restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8)

Anytime it creates JobId=00000000000000000000000000000000 which obviously leads to

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282]
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.