(DEPRECATED) Apache Flink User Mailing List archive.
Search

Advanced Search

Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

classic Classic list List threaded Threaded
♦
♦
Locked 14 messages Options
javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline
I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Piotr Nowojski
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

Piotr Nowojskionline
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager <a href="akka.tcp://flink@jobmanager:6123/" class="">akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline

Hi Piotr,


Thank you very much for your reply.


Yes, I have tried to open these ports when I create the services.  If I create them with:


docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS=jobmanager  -p 8081:8081 -p 6123:6123 -p 48081:48081  --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS=jobmanager -p 6121:6121 -p 6122:6122 --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

I still get the same issue.


Thank you very much for taking your time to look at this.


Best wishes,


Thalita


From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline
I think I may be getting somewhere with this. I have opened the blob.server.port and the query.server.port on the TaskManager service, and I can now connect to JobManager from nodes in the same subnet.

However, nodes that are located in different clouds don't seem to be able to resolve the 'jobmanager' host by name:

ubuntu@osdc-swarm-worker-1:~$ sudo docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                NAMES
e6a26caf81b4        flink:latest        "/docker-entrypoin..."   21 seconds ago      Up 1 second         6123/tcp, 8081/tcp   taskmanager.7.k0nc3tb7pxv4ppfuaxg155ku5
e12a280860a7        flink:latest        "/docker-entrypoin..."   21 seconds ago      Up 2 seconds        6123/tcp, 8081/tcp   taskmanager.8.si7f8wk132jn9z5hwbx568nbj
b459162a8ef6        flink:latest        "/docker-entrypoin..."   22 seconds ago      Up 5 seconds        6123/tcp, 8081/tcp   taskmanager.3.x2s45mt0qyx2eucirxwj0wmyx
ubuntu@osdc-swarm-worker-1:~$ sudo docker logs e12a280860a7
Starting Task Manager
config file: 
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8081
blob.server.port: 6124
query.server.port: 6125
Starting taskmanager as a console application on host e12a280860a7.
2017-11-02 18:46:35,481 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - --------------------------------------------------------------------------------
2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Starting TaskManager (Version: 1.3.2, Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Current user: flink
2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.141-b15
2017-11-02 18:46:35,745 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Maximum heap size: 1024 MiBytes
2017-11-02 18:46:35,745 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JAVA_HOME: /docker-java-home/jre
2017-11-02 18:46:35,752 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop version: 2.7.2
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JVM Options:
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -XX:+UseG1GC
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Xms1024M
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Xmx1024M
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -XX:MaxDirectMemorySize=8388607T
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Program Arguments:
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     --configDir
2017-11-02 18:46:35,754 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     /opt/flink/conf
2017-11-02 18:46:35,754 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Classpath: /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar:::
2017-11-02 18:46:35,754 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - --------------------------------------------------------------------------------
2017-11-02 18:46:35,757 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Registered UNIX signal handlers for [TERM, HUP, INT]
2017-11-02 18:46:35,781 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Maximum number of open file descriptors is 1048576
2017-11-02 18:46:35,834 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Loading configuration from /opt/flink/conf
2017-11-02 18:46:35,843 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, jobmanager
2017-11-02 18:46:35,843 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-11-02 18:46:35,844 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2017-11-02 18:46:35,844 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-11-02 18:46:35,844 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2017-11-02 18:46:35,845 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-11-02 18:46:35,845 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2017-11-02 18:46:35,845 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-11-02 18:46:35,847 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2017-11-02 18:46:35,847 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2017-11-02 18:46:35,865 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, jobmanager
2017-11-02 18:46:35,865 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-11-02 18:46:35,865 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2017-11-02 18:46:35,866 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-11-02 18:46:35,866 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2017-11-02 18:46:35,866 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-11-02 18:46:35,867 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2017-11-02 18:46:35,867 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-11-02 18:46:35,869 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2017-11-02 18:46:35,869 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2017-11-02 18:46:35,936 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to flink (auth:SIMPLE)
2017-11-02 18:46:36,522 ERROR org.apache.flink.runtime.taskmanager.TaskManager              - Failed to run TaskManager.
java.net.UnknownHostException: jobmanager: Name or service not known
    at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
    at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
    at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
    at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
    at java.net.InetAddress.getAllByName(InetAddress.java:1192)
    at java.net.InetAddress.getAllByName(InetAddress.java:1126)
    at java.net.InetAddress.getByName(InetAddress.java:1076)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:173)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:138)
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:78)
    at org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1663)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anon$2.call(TaskManager.scala:1574)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anon$2.call(TaskManager.scala:1572)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
    at org.apache.flink.runtime.taskmanager.TaskManager$.main(TaskManager.scala:1572)
    at org.apache.flink.runtime.taskmanager.TaskManager.main(TaskManager.scala)

Try the new Yahoo Mail


From: Vergilio, Thalita
Sent: 02 November 2017 18:31:45
To: Piotr Nowojski
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 

Hi Piotr,


Thank you very much for your reply.


Yes, I have tried to open these ports when I create the services.  If I create them with:


docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS=jobmanager  -p 8081:8081 -p 6123:6123 -p 48081:48081  --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS=jobmanager -p 6121:6121 -p 6122:6122 --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

I still get the same issue.


Thank you very much for taking your time to look at this.


Best wishes,


Thalita


From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline
In reply to this post by Piotr Nowojski

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.


This is how I created the services:


docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

Piotr Nowojski
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

Piotr Nowojskionline
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

Till Rohrmann
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

Till Rohrmannonline
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline

Hi Till,


Thanks a lot for your answer. 


Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned at runtime by the OS. My thinking here is that you would need to know what that is at service creation time, which would go against the whole idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service create', the configuration that is used at that point is the same that will be used by all instances of the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same service configuration(the one used to create the service).


I have in fact tried to map specific ports in the TaskManager service configuration, but then I got "port already in use" when I tried to scale up the service. 


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink project would be able to shed some light?




From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

Till Rohrmann
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

Till Rohrmannonline
I'm not entirely sure how docker swarm works but from the Flink perspective there mustn't be two TaskManagers running on the same host (meaning an entity where you share the same address) if you set the TaskManager data port to a fixed value (otherwise only one of them can be started due to port conflicts). If you can ensure that this is the case, then it should be save to specify a port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <[hidden email]> wrote:

Hi Till,


Thanks a lot for your answer. 


Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned at runtime by the OS. My thinking here is that you would need to know what that is at service creation time, which would go against the whole idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service create', the configuration that is used at that point is the same that will be used by all instances of the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same service configuration(the one used to create the service).


I have in fact tried to map specific ports in the TaskManager service configuration, but then I got "port already in use" when I tried to scale up the service. 


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink project would be able to shed some light?




From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; [hidden email]; Patrick Lucas

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline

Hi Till,


I have made some progress with the name resolution for machines that are not in the same subnet. The problem I am facing now is Flink-specific, so I wonder if you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and the taskmanager in the Google cloud. However, when I scale the taskmanager up and it start running on Azure nodes as well, I get an Akka error which I presume means the taskmanagers can't talk to each other when parallelising the task.


Do you know what the IP address and port below are? Are they assigned by Flink? 


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) (b9f31626fb7d83d39e24e570e034f03e) - TaskManager (3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding after a timeout of 10000 ms
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
	at akka.dispatch.Recover.internal(Future.scala:268)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
	at scala.util.Try$.apply(Try.scala:161)
	at scala.util.Failure.recover(Try.scala:185)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	... 1 more


From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
I'm not entirely sure how docker swarm works but from the Flink perspective there mustn't be two TaskManagers running on the same host (meaning an entity where you share the same address) if you set the TaskManager data port to a fixed value (otherwise only one of them can be started due to port conflicts). If you can ensure that this is the case, then it should be save to specify a port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <[hidden email]> wrote:

Hi Till,


Thanks a lot for your answer. 


Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned at runtime by the OS. My thinking here is that you would need to know what that is at service creation time, which would go against the whole idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service create', the configuration that is used at that point is the same that will be used by all instances of the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same service configuration(the one used to create the service).


I have in fact tried to map specific ports in the TaskManager service configuration, but then I got "port already in use" when I tried to scale up the service. 


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink project would be able to shed some light?




From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; [hidden email]; Patrick Lucas

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline

Hi All,


I just wanted to let you know that I have finally managed to get the multi-cloud setup working!! I honestly can't believe my eyes. I used a Docker plugin called Weave to create the Swarm network, a public external IP address for each node and opened a range of ports, and I can now get my Google Cloud machine to connect to the Azure machines.


There are still some minor issues, i.e. I don't know which exact ports to open for TaskManager communication in Flink. They seem to be getting assigned randomly at runtime, so I had to open a wide range of ports to allow the communication to happen, which is far from ideal.


Is there a way of finding out what these ports are and setting them to a constant value? Looking at the documentation, the suspects are:


  • taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets the OS choose a free port).

  • taskmanager.data.port: The task manager’s port used for data exchange operations (DEFAULT: 0, which lets the OS choose a free port).

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

Many thanks,


Thalita



From: Vergilio, Thalita
Sent: 09 November 2017 22:04:24
To: Till Rohrmann
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 

Hi Till,


I have made some progress with the name resolution for machines that are not in the same subnet. The problem I am facing now is Flink-specific, so I wonder if you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and the taskmanager in the Google cloud. However, when I scale the taskmanager up and it start running on Azure nodes as well, I get an Akka error which I presume means the taskmanagers can't talk to each other when parallelising the task.


Do you know what the IP address and port below are? Are they assigned by Flink? 


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) (b9f31626fb7d83d39e24e570e034f03e) - TaskManager (3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding after a timeout of 10000 ms
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
	at akka.dispatch.Recover.internal(Future.scala:268)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
	at scala.util.Try$.apply(Try.scala:161)
	at scala.util.Failure.recover(Try.scala:185)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	... 1 more


From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
I'm not entirely sure how docker swarm works but from the Flink perspective there mustn't be two TaskManagers running on the same host (meaning an entity where you share the same address) if you set the TaskManager data port to a fixed value (otherwise only one of them can be started due to port conflicts). If you can ensure that this is the case, then it should be save to specify a port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <[hidden email]> wrote:

Hi Till,


Thanks a lot for your answer. 


Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned at runtime by the OS. My thinking here is that you would need to know what that is at service creation time, which would go against the whole idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service create', the configuration that is used at that point is the same that will be used by all instances of the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same service configuration(the one used to create the service).


I have in fact tried to map specific ports in the TaskManager service configuration, but then I got "port already in use" when I tried to scale up the service. 


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink project would be able to shed some light?




From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; [hidden email]; Patrick Lucas

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

Till Rohrmann
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

Till Rohrmannonline

Hi Thalita, yes you can use the mentioned configuration parameters to set the ports for the TaskManager and the BlobServer. However, you must make sure that there is at most one TM running on a host, otherwise you run into port collisions.

For taskmanager.rpc.port and blob.server.port you can define a range.

Cheers,
Till

​

On Fri, Nov 10, 2017 at 11:47 AM, Vergilio, Thalita <[hidden email]> wrote:

Hi All,


I just wanted to let you know that I have finally managed to get the multi-cloud setup working!! I honestly can't believe my eyes. I used a Docker plugin called Weave to create the Swarm network, a public external IP address for each node and opened a range of ports, and I can now get my Google Cloud machine to connect to the Azure machines.


There are still some minor issues, i.e. I don't know which exact ports to open for TaskManager communication in Flink. They seem to be getting assigned randomly at runtime, so I had to open a wide range of ports to allow the communication to happen, which is far from ideal.


Is there a way of finding out what these ports are and setting them to a constant value? Looking at the documentation, the suspects are:


  • taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets the OS choose a free port).

  • taskmanager.data.port: The task manager’s port used for data exchange operations (DEFAULT: 0, which lets the OS choose a free port).

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

Many thanks,


Thalita



From: Vergilio, Thalita
Sent: 09 November 2017 22:04:24
To: Till Rohrmann

Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 

Hi Till,


I have made some progress with the name resolution for machines that are not in the same subnet. The problem I am facing now is Flink-specific, so I wonder if you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and the taskmanager in the Google cloud. However, when I scale the taskmanager up and it start running on Azure nodes as well, I get an Akka error which I presume means the taskmanagers can't talk to each other when parallelising the task.


Do you know what the IP address and port below are? Are they assigned by Flink? 


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) (b9f31626fb7d83d39e24e570e034f03e) - TaskManager (3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding after a timeout of 10000 ms
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
	at akka.dispatch.Recover.internal(Future.scala:268)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
	at scala.util.Try$.apply(Try.scala:161)
	at scala.util.Failure.recover(Try.scala:185)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	... 1 more


From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
I'm not entirely sure how docker swarm works but from the Flink perspective there mustn't be two TaskManagers running on the same host (meaning an entity where you share the same address) if you set the TaskManager data port to a fixed value (otherwise only one of them can be started due to port conflicts). If you can ensure that this is the case, then it should be save to specify a port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <[hidden email]> wrote:

Hi Till,


Thanks a lot for your answer. 


Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned at runtime by the OS. My thinking here is that you would need to know what that is at service creation time, which would go against the whole idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service create', the configuration that is used at that point is the same that will be used by all instances of the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same service configuration(the one used to create the service).


I have in fact tried to map specific ports in the TaskManager service configuration, but then I got "port already in use" when I tried to scale up the service. 


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink project would be able to shed some light?




From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; [hidden email]; Patrick Lucas

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


javalass
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

javalassonline

Hi Til,


Thank you very much for that. And thanks for your help. I have finally managed to get the multi-cloud setup on Docker Swarm working by tweaking the Flink image slightly to set these configuration options to known values. I have also used the Weave Net Docker plugin to create my cross-cloud network.


I am in the process of documenting my experience in a blog article, which I will share in this list so others can hopefully benefit from it.


Thank you and the rest of the Flink team once again for all your help and support.


Best wishes,


Thalita


From: Till Rohrmann <[hidden email]>
Sent: 10 November 2017 12:15:00
To: Vergilio, Thalita
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 

Hi Thalita, yes you can use the mentioned configuration parameters to set the ports for the TaskManager and the BlobServer. However, you must make sure that there is at most one TM running on a host, otherwise you run into port collisions.

For taskmanager.rpc.port and blob.server.port you can define a range.

Cheers,
Till

​

On Fri, Nov 10, 2017 at 11:47 AM, Vergilio, Thalita <[hidden email]> wrote:

Hi All,


I just wanted to let you know that I have finally managed to get the multi-cloud setup working!! I honestly can't believe my eyes. I used a Docker plugin called Weave to create the Swarm network, a public external IP address for each node and opened a range of ports, and I can now get my Google Cloud machine to connect to the Azure machines.


There are still some minor issues, i.e. I don't know which exact ports to open for TaskManager communication in Flink. They seem to be getting assigned randomly at runtime, so I had to open a wide range of ports to allow the communication to happen, which is far from ideal.


Is there a way of finding out what these ports are and setting them to a constant value? Looking at the documentation, the suspects are:


  • taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets the OS choose a free port).

  • taskmanager.data.port: The task manager’s port used for data exchange operations (DEFAULT: 0, which lets the OS choose a free port).

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

Many thanks,


Thalita



From: Vergilio, Thalita
Sent: 09 November 2017 22:04:24
To: Till Rohrmann

Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 

Hi Till,


I have made some progress with the name resolution for machines that are not in the same subnet. The problem I am facing now is Flink-specific, so I wonder if you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and the taskmanager in the Google cloud. However, when I scale the taskmanager up and it start running on Azure nodes as well, I get an Akka error which I presume means the taskmanagers can't talk to each other when parallelising the task.


Do you know what the IP address and port below are? Are they assigned by Flink? 


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) (b9f31626fb7d83d39e24e570e034f03e) - TaskManager (3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding after a timeout of 10000 ms
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
	at akka.dispatch.Recover.internal(Future.scala:268)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
	at scala.util.Try$.apply(Try.scala:161)
	at scala.util.Failure.recover(Try.scala:185)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	... 1 more


From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
I'm not entirely sure how docker swarm works but from the Flink perspective there mustn't be two TaskManagers running on the same host (meaning an entity where you share the same address) if you set the TaskManager data port to a fixed value (otherwise only one of them can be started due to port conflicts). If you can ensure that this is the case, then it should be save to specify a port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <[hidden email]> wrote:

Hi Till,


Thanks a lot for your answer. 


Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned at runtime by the OS. My thinking here is that you would need to know what that is at service creation time, which would go against the whole idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service create', the configuration that is used at that point is the same that will be used by all instances of the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same service configuration(the one used to create the service).


I have in fact tried to map specific ports in the TaskManager service configuration, but then I got "port already in use" when I tried to scale up the service. 


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink project would be able to shed some light?




From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; [hidden email]; Patrick Lucas

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html

Till Rohrmann
Reply | Threaded
Open this post in threaded view
♦
♦
|
Selected post

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

Till Rohrmannonline
Great to hear that you got it working :-) Looking forward to your blog post to learn more about your experiences :-)

Cheers,
Till

On Fri, Nov 10, 2017 at 10:18 PM, Vergilio, Thalita <[hidden email]> wrote:

Hi Til,


Thank you very much for that. And thanks for your help. I have finally managed to get the multi-cloud setup on Docker Swarm working by tweaking the Flink image slightly to set these configuration options to known values. I have also used the Weave Net Docker plugin to create my cross-cloud network.


I am in the process of documenting my experience in a blog article, which I will share in this list so others can hopefully benefit from it.


Thank you and the rest of the Flink team once again for all your help and support.


Best wishes,


Thalita


From: Till Rohrmann <[hidden email]>
Sent: 10 November 2017 12:15:00

To: Vergilio, Thalita
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 

Hi Thalita, yes you can use the mentioned configuration parameters to set the ports for the TaskManager and the BlobServer. However, you must make sure that there is at most one TM running on a host, otherwise you run into port collisions.

For taskmanager.rpc.port and blob.server.port you can define a range.

Cheers,
Till

​

On Fri, Nov 10, 2017 at 11:47 AM, Vergilio, Thalita <[hidden email]> wrote:

Hi All,


I just wanted to let you know that I have finally managed to get the multi-cloud setup working!! I honestly can't believe my eyes. I used a Docker plugin called Weave to create the Swarm network, a public external IP address for each node and opened a range of ports, and I can now get my Google Cloud machine to connect to the Azure machines.


There are still some minor issues, i.e. I don't know which exact ports to open for TaskManager communication in Flink. They seem to be getting assigned randomly at runtime, so I had to open a wide range of ports to allow the communication to happen, which is far from ideal.


Is there a way of finding out what these ports are and setting them to a constant value? Looking at the documentation, the suspects are:


  • taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets the OS choose a free port).

  • taskmanager.data.port: The task manager’s port used for data exchange operations (DEFAULT: 0, which lets the OS choose a free port).

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

Many thanks,


Thalita



From: Vergilio, Thalita
Sent: 09 November 2017 22:04:24
To: Till Rohrmann

Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 

Hi Till,


I have made some progress with the name resolution for machines that are not in the same subnet. The problem I am facing now is Flink-specific, so I wonder if you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and the taskmanager in the Google cloud. However, when I scale the taskmanager up and it start running on Azure nodes as well, I get an Akka error which I presume means the taskmanagers can't talk to each other when parallelising the task.


Do you know what the IP address and port below are? Are they assigned by Flink? 


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) (b9f31626fb7d83d39e24e570e034f03e) - TaskManager (3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding after a timeout of 10000 ms
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
	at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
	at akka.dispatch.Recover.internal(Future.scala:268)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
	at scala.util.Try$.apply(Try.scala:161)
	at scala.util.Failure.recover(Try.scala:185)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	... 1 more


From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; [hidden email]; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
I'm not entirely sure how docker swarm works but from the Flink perspective there mustn't be two TaskManagers running on the same host (meaning an entity where you share the same address) if you set the TaskManager data port to a fixed value (otherwise only one of them can be started due to port conflicts). If you can ensure that this is the case, then it should be save to specify a port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <[hidden email]> wrote:

Hi Till,


Thanks a lot for your answer. 


Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned at runtime by the OS. My thinking here is that you would need to know what that is at service creation time, which would go against the whole idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service create', the configuration that is used at that point is the same that will be used by all instances of the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same service configuration(the one used to create the service).


I have in fact tried to map specific ports in the TaskManager service configuration, but then I got "port already in use" when I tried to scale up the service. 


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink project would be able to shed some light?




From: Till Rohrmann <[hidden email]>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; [hidden email]; Patrick Lucas

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`. The query server port is only necessary if you want to use queryable state. 

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <[hidden email]> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita <[hidden email]> wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a job using the Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume this is the taskmanager.data.port, which needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



From: Piotr Nowojski <[hidden email]>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: [hidden email]
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
 
Did you try to expose required ports that are listed in the README when starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass <[hidden email]> wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:- 
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


« Return to (DEPRECATED) Apache Flink User Mailing List archive.  |  1 view|%1 views
Free forum by Nabble Edit this page