Recommendation for dealing with Job Graph incompatibility across varying Flink versions

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Recommendation for dealing with Job Graph incompatibility across varying Flink versions

Sonam Mandal
Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster such that each Flink cluster can run with a specified Flink image version. Since the Flink Job Graph needs to be compatible with the Flink version running in the Flink cluster, this brings a challenge in how we ensure that the SQL job graph or Flink job jars are compatible with the Flink cluster users want to run them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated from the SQL must be created using compatible 1.12.1 Flink libraries. Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today? 

Thanks,
Sonam
Reply | Threaded
Open this post in threaded view
|

Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions

Paul K Moore
Hi Sonam,

I am not a long-standing Flink user (3 months only) so perhaps others will have a more authoritative view.

I would say that I am using Flink in k8s, and have had some good success with the Google Flink operator (https://github.com/GoogleCloudPlatform/flink-on-k8s-operator).  This includes Custom Resource Definitions (CRDs) so that you can define your Flink clusters in YAML, and deploy using kustomize. 

The result is:

A Flink cluster of a job-manager and one-or-more task-managers.
A Kubernetes job which acts as the link “client” to submit the job to the job-manager, the job-submitter

e.g.

flink-example-job-submitter-g4s6g   0/1     Completed   0          6d15h
flink-example-jobmanager-0          1/1     Running     3          6d15h
flink-example-taskmanager-0         1/1     Running     3          6d15h

This all seems in keeping with Flink’s “Per Job-Mode” deployment option (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#per-job-mode)

Note: i’m only just getting into to state persistence and recovery, so still some work to do, but I think this is largely understanding and configuration.

Hope that helps

Paul

On 17 Jun 2021, at 23:55, Sonam Mandal <[hidden email]> wrote:

Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster such that each Flink cluster can run with a specified Flink image version. Since the Flink Job Graph needs to be compatible with the Flink version running in the Flink cluster, this brings a challenge in how we ensure that the SQL job graph or Flink job jars are compatible with the Flink cluster users want to run them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated from the SQL must be created using compatible 1.12.1 Flink libraries. Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today? 

Thanks,
Sonam

Reply | Threaded
Open this post in threaded view
|

Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions

Sonam Mandal
Hi Paul,

Thanks for getting back to me. I did take a look at the Google GO operator, and they use the /bin/flink client for job submission. My understanding is that in this scenario users must ensure that their job jar is compatible with the Flink version, and the client will just take care of the submission. Do let me know if I understood this correctly or not.

Perhaps some context on what we are doing will help. We are writing a small client which takes SQL statements and converts it to the Table environment and submits the job. The /bin/flink client does not directly take SQL out of the box and we cannot expect users to run a SQL shell to run their production SQL streaming services. Since we are dealing with the job graph generation ourselves, we have run into the issue where our client needs to be compiled with the same version of Flink that we are running, otherwise we run into job graph compatibility issues. So I wanted to understand if there is a recommendation on how to deal with this conversion in a scenario where different users may run different Flink versions in a given kubernetes cluster. 

Any thoughts?

Thanks,
Sonam 

From: Paul K Moore <[hidden email]>
Sent: Friday, June 18, 2021 2:25:52 AM
To: Sonam Mandal <[hidden email]>
Cc: [hidden email] <[hidden email]>; Srinivasulu Punuru <[hidden email]>
Subject: Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions
 
Hi Sonam,

I am not a long-standing Flink user (3 months only) so perhaps others will have a more authoritative view.

I would say that I am using Flink in k8s, and have had some good success with the Google Flink operator (https://github.com/GoogleCloudPlatform/flink-on-k8s-operator).  This includes Custom Resource Definitions (CRDs) so that you can define your Flink clusters in YAML, and deploy using kustomize. 

The result is:

A Flink cluster of a job-manager and one-or-more task-managers.
A Kubernetes job which acts as the link “client” to submit the job to the job-manager, the job-submitter

e.g.

flink-example-job-submitter-g4s6g   0/1     Completed   0          6d15h
flink-example-jobmanager-0          1/1     Running     3          6d15h
flink-example-taskmanager-0         1/1     Running     3          6d15h

This all seems in keeping with Flink’s “Per Job-Mode” deployment option (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#per-job-mode)

Note: i’m only just getting into to state persistence and recovery, so still some work to do, but I think this is largely understanding and configuration.

Hope that helps

Paul

On 17 Jun 2021, at 23:55, Sonam Mandal <[hidden email]> wrote:

Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster such that each Flink cluster can run with a specified Flink image version. Since the Flink Job Graph needs to be compatible with the Flink version running in the Flink cluster, this brings a challenge in how we ensure that the SQL job graph or Flink job jars are compatible with the Flink cluster users want to run them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated from the SQL must be created using compatible 1.12.1 Flink libraries. Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today? 

Thanks,
Sonam