Multiple Job Managers in Flink HA Setup

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple Job Managers in Flink HA Setup

Steven Nelson
Hello!

I am having some difficulty with multiple job managers in an HA setup using Flink 1.9.0. 

I have 2 job managers and have setup the HA setup with the following config

high-availability: zookeeper
high-availability.cluster-id: /imet-enhance
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: flink-state-hdfs-zookeeper-1.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-2.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-0.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181
high-availability.zookeeper.path.root: /flink
high-availability.jobmanager.port: 50000-50025

I have the job managers behind a load balancer inside a kubernetes cluster

They work great except for one thing. When I use the website (or API) to upload the Jar file and start the job sometimes the request goes to a different job manager, which doesn't have the jar file in it's temporary directory, so it fails to start.

In the 1.7 version of this setup the second Job Manager would return a Redirect request. I put an HAProxy in front of it that only allowed traffic to flow to the Job Manager that wasn't returning a 300 and this worked well for everything. In 1.9 it appears that both Job Managers are able to respond (via the internal proxy mechanism I have seen in prior emails). However it appears the web file cache is still shared. 

I also tried attaching a shared NFS folder between the two machines and tried to set their web.tmpdir property to the shared folder, however it appears that each job manager creates a seperate job inside that directory.

My end goals are:
1) Provide a fault tolerant Flink Cluster
2) provide a persistent storage directory for the Jar file so I can perform rescaling without needing to re-upload the jar file.

Thoughts?
-Steve
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Job Managers in Flink HA Setup

Gary Yao-4
Hi Steve,

> I also tried attaching a shared NFS folder between the two machines and
> tried to set their web.tmpdir property to the shared folder, however it
> appears that each job manager creates a seperate job inside that directory.

You can create a fixed upload directory via the config option 'web.upload.dir'
[1]. To avoid race conditions, it is probably best to make sure that the
directory already exists before starting the JMs (if the path does not exist,
both JMs may attempt to create it).

Alternatively you can try one of the following:
- Do not use stand-by masters
- Find the leader address from ZooKeeper, and issue a request directly [2]
- Use Flink CLI, which will resolve the leading JM from ZooKeeper. Note that
 the CLI submits the job by uploading a serialized JobGraph [2][3][4][5] (you
 could also rebuild that part of the CLI if you need programmatic job
 submission).

Lastly, I want to point out that the programmatic job submission is currently
being reworked (see [6] for details).

> 2) provide a persistent storage directory for the Jar file so I can perform
> rescaling without needing to re-upload the jar file.

Can you describe how are you rescaling?


Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#web-upload-dir
[2] https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L162
[3] https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L215
[4] https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java#L79
[5] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-1
[6] https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E

On Fri, Sep 20, 2019 at 10:57 PM Steven Nelson <[hidden email]> wrote:
Hello!

I am having some difficulty with multiple job managers in an HA setup using Flink 1.9.0. 

I have 2 job managers and have setup the HA setup with the following config

high-availability: zookeeper
high-availability.cluster-id: /imet-enhance
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: flink-state-hdfs-zookeeper-1.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-2.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-0.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181
high-availability.zookeeper.path.root: /flink
high-availability.jobmanager.port: 50000-50025

I have the job managers behind a load balancer inside a kubernetes cluster

They work great except for one thing. When I use the website (or API) to upload the Jar file and start the job sometimes the request goes to a different job manager, which doesn't have the jar file in it's temporary directory, so it fails to start.

In the 1.7 version of this setup the second Job Manager would return a Redirect request. I put an HAProxy in front of it that only allowed traffic to flow to the Job Manager that wasn't returning a 300 and this worked well for everything. In 1.9 it appears that both Job Managers are able to respond (via the internal proxy mechanism I have seen in prior emails). However it appears the web file cache is still shared. 

I also tried attaching a shared NFS folder between the two machines and tried to set their web.tmpdir property to the shared folder, however it appears that each job manager creates a seperate job inside that directory.

My end goals are:
1) Provide a fault tolerant Flink Cluster
2) provide a persistent storage directory for the Jar file so I can perform rescaling without needing to re-upload the jar file.

Thoughts?
-Steve
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Job Managers in Flink HA Setup

Yang Wang
Hi Steven,

I have test the standalone cluster on kubernetes with 2 jobmanager. 
Using active jobmanager webui or standby jobmanager webui
to submit flink jobs could both work fine. So i think maybe the problem
is about your HAProxy. Does your HAProxy will forward traffic to both
active and standby jobmanager? I think just using a random or round robin
is enough.

Best,
Yang

Gary Yao <[hidden email]> 于2019年9月25日周三 下午9:10写道:
Hi Steve,

> I also tried attaching a shared NFS folder between the two machines and
> tried to set their web.tmpdir property to the shared folder, however it
> appears that each job manager creates a seperate job inside that directory.

You can create a fixed upload directory via the config option 'web.upload.dir'
[1]. To avoid race conditions, it is probably best to make sure that the
directory already exists before starting the JMs (if the path does not exist,
both JMs may attempt to create it).

Alternatively you can try one of the following:
- Do not use stand-by masters
- Find the leader address from ZooKeeper, and issue a request directly [2]
- Use Flink CLI, which will resolve the leading JM from ZooKeeper. Note that
 the CLI submits the job by uploading a serialized JobGraph [2][3][4][5] (you
 could also rebuild that part of the CLI if you need programmatic job
 submission).

Lastly, I want to point out that the programmatic job submission is currently
being reworked (see [6] for details).

> 2) provide a persistent storage directory for the Jar file so I can perform
> rescaling without needing to re-upload the jar file.

Can you describe how are you rescaling?


Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#web-upload-dir
[2] https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L162
[3] https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L215
[4] https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java#L79
[5] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-1
[6] https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E

On Fri, Sep 20, 2019 at 10:57 PM Steven Nelson <[hidden email]> wrote:
Hello!

I am having some difficulty with multiple job managers in an HA setup using Flink 1.9.0. 

I have 2 job managers and have setup the HA setup with the following config

high-availability: zookeeper
high-availability.cluster-id: /imet-enhance
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: flink-state-hdfs-zookeeper-1.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-2.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-0.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181
high-availability.zookeeper.path.root: /flink
high-availability.jobmanager.port: 50000-50025

I have the job managers behind a load balancer inside a kubernetes cluster

They work great except for one thing. When I use the website (or API) to upload the Jar file and start the job sometimes the request goes to a different job manager, which doesn't have the jar file in it's temporary directory, so it fails to start.

In the 1.7 version of this setup the second Job Manager would return a Redirect request. I put an HAProxy in front of it that only allowed traffic to flow to the Job Manager that wasn't returning a 300 and this worked well for everything. In 1.9 it appears that both Job Managers are able to respond (via the internal proxy mechanism I have seen in prior emails). However it appears the web file cache is still shared. 

I also tried attaching a shared NFS folder between the two machines and tried to set their web.tmpdir property to the shared folder, however it appears that each job manager creates a seperate job inside that directory.

My end goals are:
1) Provide a fault tolerant Flink Cluster
2) provide a persistent storage directory for the Jar file so I can perform rescaling without needing to re-upload the jar file.

Thoughts?
-Steve