Flink on K8s job submission best practices

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink on K8s job submission best practices

Maximilian Bode
Hi everyone,

We are beginning to run Flink on K8s and found the basic templates [1] as well as the example Helm chart [2] very helpful. Also the discussion about JobManager HA [3] and Patrick's talk [4] was very interesting. All in all it is delightful how easy everything can be set up and works out of the box.

Now we are looking for some best practices as far as job submission is concerned. Having played with a few alternative options, we would like to get some input on what other people are using. What we have looked into so far:
  1. Packaging the job jar into e.g. the JM image and submitting manually (either from the UI or via `kubectl exec`). Ideally, we would like to establish a more automated setup, preferably using native Kubernetes objects.
  2. Building a separate image whose responsibility it is to submit the job and keep it running. This could either use the API [5] or share the Flink config so that CLI calls connect to the existing cluster. When scheduling this as a Kubernetes deployment [6] and e.g. the node running this client pod fails, one ends up with duplicate jobs. One could build custom logic (poll if job exists, only submit if it does not), but this seems fragile and it is conceivable that this could lead to weird timing issues like different containers trying to submit at the same time. One solution would be to implement an atomic submit-if-not-exists, but I suppose this would need to involve some level of locking on the JM.
  3. Schedule the client container from the step above as a Kubernetes job [7]. This seems somewhat unidiomatic for streaming jobs that are not expected to terminate, but one would not have to deal with duplicate Flink jobs. In the failure scenario described above, the (Flink) job would still be running on the Flink cluster, there just would not be a client attached to it (as the Kubernetes job would not be restarted). On the other hand, should the (Flink) job fail for some reason, there is no fashion of restarting it automatically.
Are we missing something obvious? Has the Flink community come up with a default way of submitting Flink jobs on Kubernetes yet or are there people willing to share their experiences?

Best regards and happy holidays,
Max

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
[2] https://github.com/docker-flink/examples/tree/master/helm/flink
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
[4] https://www.youtube.com/watch?v=w721NI-mtAA Slides: https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-patrick-lucas-flink-in-containerland
[5] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#submitting-programs
[6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
[7] https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
--
Maximilian Bode * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Flink on K8s job submission best practices

Martin Eden
The above applies to Mesos/DCOS as well. So if someone would also share insights into automatic job deployment in that setup would very useful. Thanks.
M

On Fri, Dec 22, 2017 at 6:56 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

We are beginning to run Flink on K8s and found the basic templates [1] as well as the example Helm chart [2] very helpful. Also the discussion about JobManager HA [3] and Patrick's talk [4] was very interesting. All in all it is delightful how easy everything can be set up and works out of the box.

Now we are looking for some best practices as far as job submission is concerned. Having played with a few alternative options, we would like to get some input on what other people are using. What we have looked into so far:
  1. Packaging the job jar into e.g. the JM image and submitting manually (either from the UI or via `kubectl exec`). Ideally, we would like to establish a more automated setup, preferably using native Kubernetes objects.
  2. Building a separate image whose responsibility it is to submit the job and keep it running. This could either use the API [5] or share the Flink config so that CLI calls connect to the existing cluster. When scheduling this as a Kubernetes deployment [6] and e.g. the node running this client pod fails, one ends up with duplicate jobs. One could build custom logic (poll if job exists, only submit if it does not), but this seems fragile and it is conceivable that this could lead to weird timing issues like different containers trying to submit at the same time. One solution would be to implement an atomic submit-if-not-exists, but I suppose this would need to involve some level of locking on the JM.
  3. Schedule the client container from the step above as a Kubernetes job [7]. This seems somewhat unidiomatic for streaming jobs that are not expected to terminate, but one would not have to deal with duplicate Flink jobs. In the failure scenario described above, the (Flink) job would still be running on the Flink cluster, there just would not be a client attached to it (as the Kubernetes job would not be restarted). On the other hand, should the (Flink) job fail for some reason, there is no fashion of restarting it automatically.
Are we missing something obvious? Has the Flink community come up with a default way of submitting Flink jobs on Kubernetes yet or are there people willing to share their experiences?

Best regards and happy holidays,
Max

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
[2] https://github.com/docker-flink/examples/tree/master/helm/flink
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
[4] https://www.youtube.com/watch?v=w721NI-mtAA Slides: https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-patrick-lucas-flink-in-containerland
[5] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#submitting-programs
[6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
[7] https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
--
Maximilian Bode * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Flink on K8s job submission best practices

rmetzger0
Hi,

For the future, the FLIP-6 [1] work will solve the job submission problem in a nice way: You'll be able to build a docker image containing the job and the jobmanager. It's basically a jobmanager configured to only ever run this job.
This way, by starting this image, you'll automatically also launch the job.

Until this feature is available [2] you have to build some tooling yourself. What people usually do (and that's also what dA Platform (a product by data Artisans which solves this problem, among others)) is using Flink's REST API to submit the job, once the JobManager is available.
This gets problematic in the context of HA, because a restarted JobManager container might restart a failed HA job, while your external tool is submitting the job again.
There are different solutions to this problem, like a) querying ZK if there's a job to restore or b) instead of submitting a job by posting it to the JM REST API, store the job + jobgraph to ZK, so that job submission effectively becomes a restore.


Regards,
Robert


On Sat, Dec 23, 2017 at 8:02 AM, Martin Eden <[hidden email]> wrote:
The above applies to Mesos/DCOS as well. So if someone would also share insights into automatic job deployment in that setup would very useful. Thanks.
M

On Fri, Dec 22, 2017 at 6:56 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

We are beginning to run Flink on K8s and found the basic templates [1] as well as the example Helm chart [2] very helpful. Also the discussion about JobManager HA [3] and Patrick's talk [4] was very interesting. All in all it is delightful how easy everything can be set up and works out of the box.

Now we are looking for some best practices as far as job submission is concerned. Having played with a few alternative options, we would like to get some input on what other people are using. What we have looked into so far:
  1. Packaging the job jar into e.g. the JM image and submitting manually (either from the UI or via `kubectl exec`). Ideally, we would like to establish a more automated setup, preferably using native Kubernetes objects.
  2. Building a separate image whose responsibility it is to submit the job and keep it running. This could either use the API [5] or share the Flink config so that CLI calls connect to the existing cluster. When scheduling this as a Kubernetes deployment [6] and e.g. the node running this client pod fails, one ends up with duplicate jobs. One could build custom logic (poll if job exists, only submit if it does not), but this seems fragile and it is conceivable that this could lead to weird timing issues like different containers trying to submit at the same time. One solution would be to implement an atomic submit-if-not-exists, but I suppose this would need to involve some level of locking on the JM.
  3. Schedule the client container from the step above as a Kubernetes job [7]. This seems somewhat unidiomatic for streaming jobs that are not expected to terminate, but one would not have to deal with duplicate Flink jobs. In the failure scenario described above, the (Flink) job would still be running on the Flink cluster, there just would not be a client attached to it (as the Kubernetes job would not be restarted). On the other hand, should the (Flink) job fail for some reason, there is no fashion of restarting it automatically.
Are we missing something obvious? Has the Flink community come up with a default way of submitting Flink jobs on Kubernetes yet or are there people willing to share their experiences?

Best regards and happy holidays,
Max

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
[2] https://github.com/docker-flink/examples/tree/master/helm/flink
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
[4] https://www.youtube.com/watch?v=w721NI-mtAA Slides: https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-patrick-lucas-flink-in-containerland
[5] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#submitting-programs
[6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
[7] https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
--
Maximilian Bode * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Reply | Threaded
Open this post in threaded view
|

Re: Flink on K8s job submission best practices

Christophe Jolif
In reply to this post by Maximilian Bode
Hi Maximilian,

Coming back on this as we have similar challenges.

I was leaning towards 3. But then I read you and figured I might have missed something ;)

We agree 3 is not idiomatic and creates a "detached job" but in a lack of a proper solution I can live with that.

We also agree there is not risk to get the (Flink) job resubmitted while it should not because of the (Kubernetes) Job nature.

What I missed and I would like to better understand is the risk of the (Flink) job not being restarted. If I'm not mistaken (and if the job is probably configured) in case of (Flink) job failure Flink will restart the (Flink) job itself so you don't have anything else to do. What type of failures do you have in mind when saying 3 will not see the (Flink) job restarted?

Thanks!!
--
Christophe


On Fri, Dec 22, 2017 at 2:56 PM, Maximilian Bode <[hidden email]> wrote:
Hi everyone,

We are beginning to run Flink on K8s and found the basic templates [1] as well as the example Helm chart [2] very helpful. Also the discussion about JobManager HA [3] and Patrick's talk [4] was very interesting. All in all it is delightful how easy everything can be set up and works out of the box.

Now we are looking for some best practices as far as job submission is concerned. Having played with a few alternative options, we would like to get some input on what other people are using. What we have looked into so far:
  1. Packaging the job jar into e.g. the JM image and submitting manually (either from the UI or via `kubectl exec`). Ideally, we would like to establish a more automated setup, preferably using native Kubernetes objects.
  2. Building a separate image whose responsibility it is to submit the job and keep it running. This could either use the API [5] or share the Flink config so that CLI calls connect to the existing cluster. When scheduling this as a Kubernetes deployment [6] and e.g. the node running this client pod fails, one ends up with duplicate jobs. One could build custom logic (poll if job exists, only submit if it does not), but this seems fragile and it is conceivable that this could lead to weird timing issues like different containers trying to submit at the same time. One solution would be to implement an atomic submit-if-not-exists, but I suppose this would need to involve some level of locking on the JM.
  3. Schedule the client container from the step above as a Kubernetes job [7]. This seems somewhat unidiomatic for streaming jobs that are not expected to terminate, but one would not have to deal with duplicate Flink jobs. In the failure scenario described above, the (Flink) job would still be running on the Flink cluster, there just would not be a client attached to it (as the Kubernetes job would not be restarted). On the other hand, should the (Flink) job fail for some reason, there is no fashion of restarting it automatically.
Are we missing something obvious? Has the Flink community come up with a default way of submitting Flink jobs on Kubernetes yet or are there people willing to share their experiences?

Best regards and happy holidays,
Max

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
[2] https://github.com/docker-flink/examples/tree/master/helm/flink
[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
[4] https://www.youtube.com/watch?v=w721NI-mtAA Slides: https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-patrick-lucas-flink-in-containerland
[5] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#submitting-programs
[6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
[7] https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
--
Maximilian Bode * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082