How JobManager and TaskManager find each other?

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

How JobManager and TaskManager find each other?

KristoffSC
Hi all,
I was wondering how JobManager and TaskManager find each other?
Do they use multicast for this?

Can it be configure to use domain names instead IP's?
What I have to do to have two Flink Clusters in same IP network?
How I should start task manager in order to tell him, to connect to cluster
B not A?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

Xintong Song
Hi Krzysztof,

You can use the following configuration options to specify JM/TM addresses and ports.
- jobmanager.rpc.address
- jobmanager.rpc.port
- taskmanager.host
- taskmanager.rpc.port

The configuration accepts both IP addresses and hostnames.

If you have two Flink Clusters, you can specify different JM address/ports in your TM configurations, so the TM knows which JM to connect to.

Thank you~

Xintong Song



On Fri, Feb 21, 2020 at 6:38 AM KristoffSC <[hidden email]> wrote:
Hi all,
I was wondering how JobManager and TaskManager find each other?
Do they use multicast for this?

Can it be configure to use domain names instead IP's?
What I have to do to have two Flink Clusters in same IP network?
How I should start task manager in order to tell him, to connect to cluster
B not A?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

Yang Wang
Hi Krzysztof,

Xintong has share some information about the configuration. I just want to provide some additional
information. 

>> How JobManager and TaskManager find each other?
For non-HA Flink cluster, the taskmanager should have the pre-configured jobmanager address.
For HA Flink cluster, the leader retriever service is used for taskmanager to get the jobmanager
address. Currently, Flink provide the zookeeper implementation. And it has been widely used in 
production.

>> Can it be configure to use domain names instead IP's?
It could be a domain name. But you should make sure the it could be resolved by other jobmanager/
taskmanagers.

>> What I have to do to have two Flink Clusters in same IP network?
I am not sure what do you mean. Do you mean you have a NAT network and multiple docker containers
will share a internal ip address?

>> How I should start task manager in order to tell him, to connect to cluster B not A?
Just like i have said in the first section. For non-HA Flink cluster, you need to specify the jobmanager rpc
address and port. For HA Flink cluster, you do not need to do anything.


Best,
Yang


Xintong Song <[hidden email]> 于2020年2月21日周五 上午11:41写道:
Hi Krzysztof,

You can use the following configuration options to specify JM/TM addresses and ports.
- jobmanager.rpc.address
- jobmanager.rpc.port
- taskmanager.host
- taskmanager.rpc.port

The configuration accepts both IP addresses and hostnames.

If you have two Flink Clusters, you can specify different JM address/ports in your TM configurations, so the TM knows which JM to connect to.

Thank you~

Xintong Song



On Fri, Feb 21, 2020 at 6:38 AM KristoffSC <[hidden email]> wrote:
Hi all,
I was wondering how JobManager and TaskManager find each other?
Do they use multicast for this?

Can it be configure to use domain names instead IP's?
What I have to do to have two Flink Clusters in same IP network?
How I should start task manager in order to tell him, to connect to cluster
B not A?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

KristoffSC
Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

Zhijiang(wangzhijiang999)
I guess you are indicating the data shuffle process among different task managers.

While task manager(TM) registering itself to the job manager(JM), it also carries the infos of ip address and data port that it listens to.
During the process of scheduling tasks, the upstream TM's address info(ip, port) would be covered inside the data structure of task
deployment descriptor for respective downstream tasks. Then the downstream tasks can connect to the remote upstream TM
to request data.

In short words, JM knows all the addresses of TMs via registration, then these addresses would be sent to the required peers during task schedule and deployment.

Best,
Zhijiang

------------------------------------------------------------------
From:KristoffSC <[hidden email]>
Send Time:2020 Feb. 26 (Wed.) 19:39
To:user <[hidden email]>
Subject:Re: How JobManager and TaskManager find each other?

Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

KristoffSC
Thank you very much,
what about if node Ip changes? Does it also supports dns or "raw" IP
addresses only.
I'm thinking about cloud deployments where actual service/process can be
rescheduled to a different box but there is name resolving mechanism.

Also what if there is NAT between Task Manager and Job Manager.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

Yang Wang
So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is different?

I am not sure the TM registers to JM with hostname or ip address. We could
confirm that from @Zhijiang. 

For the NAT, currently Flink could not work well with NAT. Since the taskmanager
do not provide a bind host/port for others to connect. There's some tickets to track
this problem[1].




Best,
Yang

KristoffSC <[hidden email]> 于2020年2月26日周三 下午11:34写道:
Thank you very much,
what about if node Ip changes? Does it also supports dns or "raw" IP
addresses only.
I'm thinking about cloud deployments where actual service/process can be
rescheduled to a different box but there is name resolving mechanism.

Also what if there is NAT between Task Manager and Job Manager.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

KristoffSC
/So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is
different?/

Well that also but actually I was thinking about running FLink on PaaS
platforms where process can be re-spawned during runtime on different
machine under different IP.

Supposedly OpenShift is doing this.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

Yang Wang
Well i think the biggest challenge when running Flink on PaaS platforms
is about the NAT network. Since the jobmanager/taskmanagers are running
in a NAT network. So they could not talk directly. What we need to do is
make the JM/TM to bind the local machine hostname and use a bind-port
for the communication.

The community is still working on this.


Best,
Yang

KristoffSC <[hidden email]> 于2020年2月27日周四 下午6:21写道:
/So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is
different?/

Well that also but actually I was thinking about running FLink on PaaS
platforms where process can be re-spawned during runtime on different
machine under different IP.

Supposedly OpenShift is doing this.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

KristoffSC
Thanks about clarification for NAT,

Moving NAT issue aside for a moment",

Is the process of sending "task deployment descriptor" that you mentioned in
"Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
about IP of participating TaskManagers in job described somewhere? I'm
familiar with [1] [2] but in there there is no information about sending the
IP information of Task managers.


Another question is how this all sums for Kubernetes Job Session Cluster
deployment when nodes will be deployed across many physical machines inside
Kubernetes cluster.
If I'm using Kubernetes like described in [3]

The final question would be, do I have to modify jobmanager.rpc.address and
flink/conf/slaves file when running Docker JobCluster on Kubernetes. The
default values are localhost.
Or just following [3] will be fine?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
[3]
https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How JobManager and TaskManager find each other?

Yang Wang
Hi KristoffSC,

Regarding your questions inline.

> 1. task deployment descriptor
The `TaskDeploymentDescriptor` is used by JobMaster to submit a task to TaskManager. 
Since the JobMaster knows all the operator and its location, it will put the upstream operator location
in the `TaskDeploymentDescriptor`. So when the task is running, it always know how to communicate
with others.

> 2. Kubernetes job cluster
When you deploy on Kubernetes, it is very different as NAT in PAAS. The Kubernetes always has a
default overlay network. Each JobManager/TaskManager (i.e. Kubernetes Pod) will be assigned with
a unique hostname and ip[1]. They could talk to each other directly. So you do not need to set any
bind-host and bind-port.

> 3. Modify jobmanager.rpc.address
You need to create a Kubernetes service and set the `jobmanager.rpc.address` to the service name.
This is used for the JobManager fault tolerance. When the JobManager failed and relaunched again,
the TaskManager could still use the service name to re-register to JobManager. 
You do need to update conf/slaves and just follow the guide[2].




Best,
Yang

KristoffSC <[hidden email]> 于2020年3月2日周一 下午3:25写道:
Thanks about clarification for NAT,

Moving NAT issue aside for a moment",

Is the process of sending "task deployment descriptor" that you mentioned in
"Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
about IP of participating TaskManagers in job described somewhere? I'm
familiar with [1] [2] but in there there is no information about sending the
IP information of Task managers.


Another question is how this all sums for Kubernetes Job Session Cluster
deployment when nodes will be deployed across many physical machines inside
Kubernetes cluster.
If I'm using Kubernetes like described in [3]

The final question would be, do I have to modify jobmanager.rpc.address and
flink/conf/slaves file when running Docker JobCluster on Kubernetes. The
default values are localhost.
Or just following [3] will be fine?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
[3]
https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/