Migration to Flip6 Kubernetes

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Migration to Flip6 Kubernetes

Edward Rojas
Hello,

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.


I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready.

If I understand correctly from
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.
 
I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?


* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)


Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed.

--
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Migration to Flip6 Kubernetes

Till Rohrmann
Hi Edward,

you're right that Flink's Kubernetes documentation has not been updated with respect to Flip-6. This will be one of the tasks during the Flink 1.5 release testing and is still pending.

A Flink cluster can be run in two modes: session mode vs per-job mode. The former starts a cluster to which you can submit multiple jobs. The cluster shares the same ResourceManager and a Dispatcher which is responsible for spawning JobMasters which execute a single job each. The latter starts a Flink cluster which is pre-initialized with a JobGraph and only runs this job. Here we also start a ResourceManager and a MiniDispatcher whose job it is to simply start a single JobMaster with the pre-initialized JobGraph.

StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.

The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the JobGraph from HDFS and then automatically starts executing it. There is no script which directly starts this entrypoint, but the YarnClusterDescriptor uses it when `deployJobCluster` is called.

Depending on what you want to achieve: Either building generic K8 images to which you can submit any number of Flink jobs or having a special image which contains the single job you want to exeucte, you either have to call into the SessionClusterEntrypoint or the JobClusterEntrypoint. When starting a session cluster, then you can use bin/flink run to submit a job to this cluster.

Let me know if you have other questions.

Cheers,
Till

On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <[hidden email]> wrote:
Hello,

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.


I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready.

If I understand correctly from
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.

I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?


* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)


Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed.

--
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Migration to Flip6 Kubernetes

Edward Rojas
Hi Till,

Thanks for the information. We are using the session cluster and is working
quite good, but we would like to benefit from the new approach of per-job
mode in order to have a better control over the jobs that are running on the
cluster.

I took a look to the YarnJobClusterEntrypoint and I see now how this planned
to be done, but if I understand correctly, in the current state there is not
possible to start a Job cluster on kubernetes as there is only concrete
implementation for Yarn and mesos?

The objective being to have a Flink cluster running on per-job mode and able
to execute several self-contained jobs, I imagine the idea would be also to
have a Kubernetes specific implementation of the ResourceManager that would
be initialized along the TaskManagers and would be listening for the
"self-contained jobs" to join, assign resources and start the execution of
the specific job, each one with its own JobManager?

Is my understanding correct?
Is the per-job mode on kubernetes planned to be included on 1.5 ?

Regards,
Edward




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Migration to Flip6 Kubernetes

Eron Wright
In reply to this post by Till Rohrmann
It would be helpful to expand on how, in job mode, the job graph would be produced.  The phrase 'which contains the single job you want to execute' has a few meanings; I believe Till means a serialized job graph, not an executable JAR w/ main method.  Till is that correct?

On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann <[hidden email]> wrote:
Hi Edward,

you're right that Flink's Kubernetes documentation has not been updated with respect to Flip-6. This will be one of the tasks during the Flink 1.5 release testing and is still pending.

A Flink cluster can be run in two modes: session mode vs per-job mode. The former starts a cluster to which you can submit multiple jobs. The cluster shares the same ResourceManager and a Dispatcher which is responsible for spawning JobMasters which execute a single job each. The latter starts a Flink cluster which is pre-initialized with a JobGraph and only runs this job. Here we also start a ResourceManager and a MiniDispatcher whose job it is to simply start a single JobMaster with the pre-initialized JobGraph.

StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.

The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the JobGraph from HDFS and then automatically starts executing it. There is no script which directly starts this entrypoint, but the YarnClusterDescriptor uses it when `deployJobCluster` is called.

Depending on what you want to achieve: Either building generic K8 images to which you can submit any number of Flink jobs or having a special image which contains the single job you want to exeucte, you either have to call into the SessionClusterEntrypoint or the JobClusterEntrypoint. When starting a session cluster, then you can use bin/flink run to submit a job to this cluster.

Let me know if you have other questions.

Cheers,
Till

On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <[hidden email]> wrote:
Hello,

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.


I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready.

If I understand correctly from
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.

I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?


* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)


Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed.

--
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Migration to Flip6 Kubernetes

Till Rohrmann
Hi Edward and Eron,

you're right that there is currently no JobClusterEntrypoint implementation for Kubernetes. How this entrypoint looks like mostly depends on how the job is stored and retrieved. There are multiple ways conceivable:

- The entrypoint connects to an external system from which it fetches the JobGraph
- The entrypoint contains the serialized JobGraph similar to how the YarnJobClusterEntrypoint works, but this would mean that you have a separate image per job
- The entrypoint actually executes a user jar which generates the JobGraph similar to what happens on the client when you submit a job

I'm not a Kubernetes expert and therefore I don't know what's the most idiomatic approach to it. But once we have figured this out, it should not be too difficult to write the Kubernetes JobClusterEntrypoint.

If we say that Kubernetes is responsible for assigning new resources, then we need a special KubernetesResourceManager which automatically assigns all registered slots to the single JobMaster. This JobMaster would then accept all slots and scale the job to how many slots it got offered. That way we could easily let K8 control the resources.

If there is a way to communicate with K8 from within Flink, then we could also implement a mode which is similar to Flink's Yarn integration. The K8RM would then ask for new pods to be started if the JM needs more slots.

The per-job mode on K8 won't unfortunately make it into Flink 1.5. But I'm confident that the community will address this issue with Flink 1.6.

Cheers,
Till


On Wed, Mar 21, 2018 at 4:08 PM, Eron Wright <[hidden email]> wrote:
It would be helpful to expand on how, in job mode, the job graph would be produced.  The phrase 'which contains the single job you want to execute' has a few meanings; I believe Till means a serialized job graph, not an executable JAR w/ main method.  Till is that correct?

On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann <[hidden email]> wrote:
Hi Edward,

you're right that Flink's Kubernetes documentation has not been updated with respect to Flip-6. This will be one of the tasks during the Flink 1.5 release testing and is still pending.

A Flink cluster can be run in two modes: session mode vs per-job mode. The former starts a cluster to which you can submit multiple jobs. The cluster shares the same ResourceManager and a Dispatcher which is responsible for spawning JobMasters which execute a single job each. The latter starts a Flink cluster which is pre-initialized with a JobGraph and only runs this job. Here we also start a ResourceManager and a MiniDispatcher whose job it is to simply start a single JobMaster with the pre-initialized JobGraph.

StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.

The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the JobGraph from HDFS and then automatically starts executing it. There is no script which directly starts this entrypoint, but the YarnClusterDescriptor uses it when `deployJobCluster` is called.

Depending on what you want to achieve: Either building generic K8 images to which you can submit any number of Flink jobs or having a special image which contains the single job you want to exeucte, you either have to call into the SessionClusterEntrypoint or the JobClusterEntrypoint. When starting a session cluster, then you can use bin/flink run to submit a job to this cluster.

Let me know if you have other questions.

Cheers,
Till

On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <[hidden email]> wrote:
Hello,

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.


I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready.

If I understand correctly from
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.

I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?


* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)


Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed.

--
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: Migration to Flip6 Kubernetes

Derek VerLee

Is anyone actively working on direct Kubernetes support?

I'd be excited to see this get in sooner rather than later, I'd be happy to start a PR.


On 3/22/18 10:37 AM, Till Rohrmann wrote:
Hi Edward and Eron,

you're right that there is currently no JobClusterEntrypoint implementation for Kubernetes. How this entrypoint looks like mostly depends on how the job is stored and retrieved. There are multiple ways conceivable:

- The entrypoint connects to an external system from which it fetches the JobGraph
- The entrypoint contains the serialized JobGraph similar to how the YarnJobClusterEntrypoint works, but this would mean that you have a separate image per job
- The entrypoint actually executes a user jar which generates the JobGraph similar to what happens on the client when you submit a job

I'm not a Kubernetes expert and therefore I don't know what's the most idiomatic approach to it. But once we have figured this out, it should not be too difficult to write the Kubernetes JobClusterEntrypoint.

If we say that Kubernetes is responsible for assigning new resources, then we need a special KubernetesResourceManager which automatically assigns all registered slots to the single JobMaster. This JobMaster would then accept all slots and scale the job to how many slots it got offered. That way we could easily let K8 control the resources.

If there is a way to communicate with K8 from within Flink, then we could also implement a mode which is similar to Flink's Yarn integration. The K8RM would then ask for new pods to be started if the JM needs more slots.

The per-job mode on K8 won't unfortunately make it into Flink 1.5. But I'm confident that the community will address this issue with Flink 1.6.

Cheers,
Till


On Wed, Mar 21, 2018 at 4:08 PM, Eron Wright <[hidden email]> wrote:
It would be helpful to expand on how, in job mode, the job graph would be produced.  The phrase 'which contains the single job you want to execute' has a few meanings; I believe Till means a serialized job graph, not an executable JAR w/ main method.  Till is that correct?

On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann <[hidden email]> wrote:
Hi Edward,

you're right that Flink's Kubernetes documentation has not been updated with respect to Flip-6. This will be one of the tasks during the Flink 1.5 release testing and is still pending.

A Flink cluster can be run in two modes: session mode vs per-job mode. The former starts a cluster to which you can submit multiple jobs. The cluster shares the same ResourceManager and a Dispatcher which is responsible for spawning JobMasters which execute a single job each. The latter starts a Flink cluster which is pre-initialized with a JobGraph and only runs this job. Here we also start a ResourceManager and a MiniDispatcher whose job it is to simply start a single JobMaster with the pre-initialized JobGraph.

StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.

The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the JobGraph from HDFS and then automatically starts executing it. There is no script which directly starts this entrypoint, but the YarnClusterDescriptor uses it when `deployJobCluster` is called.

Depending on what you want to achieve: Either building generic K8 images to which you can submit any number of Flink jobs or having a special image which contains the single job you want to exeucte, you either have to call into the SessionClusterEntrypoint or the JobClusterEntrypoint. When starting a session cluster, then you can use bin/flink run to submit a job to this cluster.

Let me know if you have other questions.

Cheers,
Till

On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <[hidden email]> wrote:
Hello,

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.


I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready.

If I understand correctly from
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.

I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?


* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)


Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed.

--
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/