flink yarn-session failure

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink yarn-session failure

Stefanos Antaris
Hi to all,

i am trying to use Flink with Hadoop yarn but i am facing an exception while trying to create a yarn-session.

First of all, i have a Hadoop cluster with 20 VMs that uses yarn. I can start the Hadoop cluster and run Hadoop jobs without any problem. Furthermore, i am trying to deploy a Flink cluster on the same VMs and use the Flink Yarn client. I have the HADOOP_HOME environmental variable set and the hadoop cluster up and running. When i execute the ./bin/yarn-session.sh -n 10 -tm 8192 -s 32 command i have the following exception. Can someone explain me how to solve this?

10:20:56,105 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master/192.168.0.194:8032
10:20:56,353 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:20:57,095 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager count = 10
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  JobManager memory = 1024
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager memory = 2048
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - This YARN session requires 21504MB of memory in the cluster. There are currently only 8192MB available.
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (3/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (4/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (5/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (6/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (7/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (8/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (9/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:58,204 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-dist-0.10.0.jar to <a href="hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-dist-" class="">hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-dist-0.10.0.jar
10:21:00,235 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hduser/flink-0.10.0/conf/flink-conf.yaml to <a href="hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-conf.yaml" class="">hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-conf.yaml
10:21:00,277 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/log4j-1.2.17.jar to <a href="hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j-1.2.17.jar" class="">hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j-1.2.17.jar
10:21:00,349 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/slf4j-log4j12-1.7.7.jar to <a href="hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/slf4j-" class="">hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/slf4j-log4j12-1.7.7.jar
10:21:00,400 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-python-0.10.0.jar to <a href="hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-" class="">hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-python-0.10.0.jar
10:21:00,441 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/logback.xml to <a href="hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/logback.xml" class="">hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/logback.xml
10:21:00,486 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/log4j.properties to <a href="hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j.properties" class="">hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j.properties
10:21:00,553 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1447928096470_0002
10:21:00,963 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1447928096470_0002
10:21:00,964 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
10:21:00,969 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:01,973 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:02,977 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:03,982 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:04,986 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:05,990 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:06,994 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:07,996 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:09,003 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:10,007 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:11,011 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
Error while deploying YARN cluster: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
org.apache.flink.yarn.FlinkYarnClientBase$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
at org.apache.flink.yarn.FlinkYarnClientBase.deployInternal(FlinkYarnClientBase.java:646)
at org.apache.flink.yarn.FlinkYarnClientBase.deploy(FlinkYarnClientBase.java:338)
at org.apache.flink.client.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:409)
at org.apache.flink.client.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:351)




Just to mention that my link-conf.yaml is the following : 
#==============================================================================
# Common
#==============================================================================

# The host on which the JobManager runs. Only used in non-high-availability mode.
# The JobManager process will use this hostname to bind the listening servers to.
# The TaskManagers will try to connect to the JobManager on that host.

jobmanager.rpc.address: master


# The port where the JobManager's main actor system listens for messages.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM

jobmanager.heap.mb: 256


# The heap size for the TaskManager JVM

taskmanager.heap.mb: 512


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 10


# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 5


#==============================================================================
# Web Frontend
#==============================================================================

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081


# The port uder which the standalone web client
# (for job upload and submit) listens.

webclient.port: 8080


#==============================================================================
# Streaming state checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if 
# checkpointing is enabled. 
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory> 
#
#state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
# state.backend.fs.checkpointdir: <a href="hdfs://namenode-host:port/flink-checkpoints" class="">hdfs://namenode-host:port/flink-checkpoints


#==============================================================================
# Advanced
#==============================================================================

# The number of buffers for the network stack.
#
# taskmanager.network.numberOfBuffers: 2048


# Directories for temporary files.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# If not specified, the system-specific Java temporary directory (java.io.tmpdir
# property) is taken.
#
# taskmanager.tmp.dirs: /tmp


# Path to the Hadoop configuration directory.
#
# This configuration is used when writing into HDFS. Unless specified otherwise,
# HDFS file creation will use HDFS default settings with respect to block-size,
# replication factor, etc.
#
# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop/


#==============================================================================
# Master High Availability (required configuration)
#==============================================================================

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
# recovery.mode: zookeeper
#
# recovery.zookeeper.quorum: localhost:2181,...
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
# recovery.zookeeper.storageDir: <a href="hdfs:///recovery" class="">hdfs:///recovery

Thanks in advance,
Stefanos Antaris

Reply | Threaded
Open this post in threaded view
|

Re: flink yarn-session failure

rmetzger0
The exception is thrown even before Flink code is executed, so I assume that your YARN setup is not properly working.
Did you try running any other YARN application on the setup? I suspect that other systems like MapReduce or Spark will also not run on the environment.

Maybe the yarn-site.xml on the NodeManager hosts is not correct (pointing to localhost instead of the master)

On Thu, Nov 19, 2015 at 11:41 AM, Stefanos Antaris <[hidden email]> wrote:
Hi to all,

i am trying to use Flink with Hadoop yarn but i am facing an exception while trying to create a yarn-session.

First of all, i have a Hadoop cluster with 20 VMs that uses yarn. I can start the Hadoop cluster and run Hadoop jobs without any problem. Furthermore, i am trying to deploy a Flink cluster on the same VMs and use the Flink Yarn client. I have the HADOOP_HOME environmental variable set and the hadoop cluster up and running. When i execute the ./bin/yarn-session.sh -n 10 -tm 8192 -s 32 command i have the following exception. Can someone explain me how to solve this?

10:20:56,105 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master/192.168.0.194:8032
10:20:56,353 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:20:57,095 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager count = 10
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  JobManager memory = 1024
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager memory = 2048
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - This YARN session requires 21504MB of memory in the cluster. There are currently only 8192MB available.
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (3/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (4/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (5/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (6/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (7/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (8/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (9/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:58,204 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-dist-0.10.0.jar
10:21:00,235 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hduser/flink-0.10.0/conf/flink-conf.yaml to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-conf.yaml
10:21:00,277 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/log4j-1.2.17.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j-1.2.17.jar
10:21:00,349 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/slf4j-log4j12-1.7.7.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/slf4j-log4j12-1.7.7.jar
10:21:00,400 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-python-0.10.0.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-python-0.10.0.jar
10:21:00,441 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/logback.xml to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/logback.xml
10:21:00,486 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/log4j.properties to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j.properties
10:21:00,553 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1447928096470_0002
10:21:00,963 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1447928096470_0002
10:21:00,964 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
10:21:00,969 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:01,973 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:02,977 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:03,982 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:04,986 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:05,990 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:06,994 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:07,996 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:09,003 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:10,007 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:11,011 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
Error while deploying YARN cluster: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
org.apache.flink.yarn.FlinkYarnClientBase$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
at org.apache.flink.yarn.FlinkYarnClientBase.deployInternal(FlinkYarnClientBase.java:646)
at org.apache.flink.yarn.FlinkYarnClientBase.deploy(FlinkYarnClientBase.java:338)
at org.apache.flink.client.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:409)
at org.apache.flink.client.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:351)




Just to mention that my link-conf.yaml is the following : 
#==============================================================================
# Common
#==============================================================================

# The host on which the JobManager runs. Only used in non-high-availability mode.
# The JobManager process will use this hostname to bind the listening servers to.
# The TaskManagers will try to connect to the JobManager on that host.

jobmanager.rpc.address: master


# The port where the JobManager's main actor system listens for messages.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM

jobmanager.heap.mb: 256


# The heap size for the TaskManager JVM

taskmanager.heap.mb: 512


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 10


# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 5


#==============================================================================
# Web Frontend
#==============================================================================

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081


# The port uder which the standalone web client
# (for job upload and submit) listens.

webclient.port: 8080


#==============================================================================
# Streaming state checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if 
# checkpointing is enabled. 
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory> 
#
#state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
# state.backend.fs.checkpointdir: hdfs://namenode-host:port/flink-checkpoints


#==============================================================================
# Advanced
#==============================================================================

# The number of buffers for the network stack.
#
# taskmanager.network.numberOfBuffers: 2048


# Directories for temporary files.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# If not specified, the system-specific Java temporary directory (java.io.tmpdir
# property) is taken.
#
# taskmanager.tmp.dirs: /tmp


# Path to the Hadoop configuration directory.
#
# This configuration is used when writing into HDFS. Unless specified otherwise,
# HDFS file creation will use HDFS default settings with respect to block-size,
# replication factor, etc.
#
# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop/


#==============================================================================
# Master High Availability (required configuration)
#==============================================================================

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
# recovery.mode: zookeeper
#
# recovery.zookeeper.quorum: localhost:2181,...
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
# recovery.zookeeper.storageDir: hdfs:///recovery

Thanks in advance,
Stefanos Antaris


Reply | Threaded
Open this post in threaded view
|

Re: flink yarn-session failure

Stefanos Antaris
Yes. You are right. I cannot run any YARN application. However, i have no localhost in my yarn-site.xml. 

<configuration>
 <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>master:8025</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>master:8030</value>
    </property>
    <property>
        <name>yarn.resourcemanager.address</name>
        <value>master:8050</value>
    </property>
</configuration>


Could someone provide me a correct yarn-site.xml in order to make it work? Should the yarn-site.xml be the same in both namenode and datanodes? Sorry for this question but different tutorials on google refer to different configurations and i am confused.

Thanks,
Stefanos

On 19 Nov 2015, at 12:47, Robert Metzger <[hidden email]> wrote:

The exception is thrown even before Flink code is executed, so I assume that your YARN setup is not properly working.
Did you try running any other YARN application on the setup? I suspect that other systems like MapReduce or Spark will also not run on the environment.

Maybe the yarn-site.xml on the NodeManager hosts is not correct (pointing to localhost instead of the master)

On Thu, Nov 19, 2015 at 11:41 AM, Stefanos Antaris <[hidden email]> wrote:
Hi to all,

i am trying to use Flink with Hadoop yarn but i am facing an exception while trying to create a yarn-session.

First of all, i have a Hadoop cluster with 20 VMs that uses yarn. I can start the Hadoop cluster and run Hadoop jobs without any problem. Furthermore, i am trying to deploy a Flink cluster on the same VMs and use the Flink Yarn client. I have the HADOOP_HOME environmental variable set and the hadoop cluster up and running. When i execute the ./bin/yarn-session.sh -n 10 -tm 8192 -s 32 command i have the following exception. Can someone explain me how to solve this?

10:20:56,105 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master/192.168.0.194:8032
10:20:56,353 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:20:57,095 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager count = 10
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  JobManager memory = 1024
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager memory = 2048
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - This YARN session requires 21504MB of memory in the cluster. There are currently only 8192MB available.
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (3/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (4/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (5/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (6/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (7/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (8/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (9/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:58,204 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-dist-0.10.0.jar
10:21:00,235 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hduser/flink-0.10.0/conf/flink-conf.yaml to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-conf.yaml
10:21:00,277 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/log4j-1.2.17.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j-1.2.17.jar
10:21:00,349 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/slf4j-log4j12-1.7.7.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/slf4j-log4j12-1.7.7.jar
10:21:00,400 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-python-0.10.0.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-python-0.10.0.jar
10:21:00,441 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/logback.xml to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/logback.xml
10:21:00,486 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/log4j.properties to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j.properties
10:21:00,553 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1447928096470_0002
10:21:00,963 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1447928096470_0002
10:21:00,964 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
10:21:00,969 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:01,973 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:02,977 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:03,982 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:04,986 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:05,990 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:06,994 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:07,996 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:09,003 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:10,007 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:11,011 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
Error while deploying YARN cluster: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
org.apache.flink.yarn.FlinkYarnClientBase$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
at org.apache.flink.yarn.FlinkYarnClientBase.deployInternal(FlinkYarnClientBase.java:646)
at org.apache.flink.yarn.FlinkYarnClientBase.deploy(FlinkYarnClientBase.java:338)
at org.apache.flink.client.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:409)
at org.apache.flink.client.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:351)



Just to mention that my link-conf.yaml is the following : 
#==============================================================================
# Common
#==============================================================================

# The host on which the JobManager runs. Only used in non-high-availability mode.
# The JobManager process will use this hostname to bind the listening servers to.
# The TaskManagers will try to connect to the JobManager on that host.

jobmanager.rpc.address: master


# The port where the JobManager's main actor system listens for messages.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM

jobmanager.heap.mb: 256


# The heap size for the TaskManager JVM

taskmanager.heap.mb: 512


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 10


# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 5


#==============================================================================
# Web Frontend
#==============================================================================

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081


# The port uder which the standalone web client
# (for job upload and submit) listens.

webclient.port: 8080


#==============================================================================
# Streaming state checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if 
# checkpointing is enabled. 
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory> 
#
#state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
# state.backend.fs.checkpointdir: hdfs://namenode-host:port/flink-checkpoints


#==============================================================================
# Advanced
#==============================================================================

# The number of buffers for the network stack.
#
# taskmanager.network.numberOfBuffers: 2048


# Directories for temporary files.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# If not specified, the system-specific Java temporary directory (java.io.tmpdir
# property) is taken.
#
# taskmanager.tmp.dirs: /tmp


# Path to the Hadoop configuration directory.
#
# This configuration is used when writing into HDFS. Unless specified otherwise,
# HDFS file creation will use HDFS default settings with respect to block-size,
# replication factor, etc.
#
# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop/


#==============================================================================
# Master High Availability (required configuration)
#==============================================================================

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
# recovery.mode: zookeeper
#
# recovery.zookeeper.quorum: localhost:2181,...
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
# recovery.zookeeper.storageDir: hdfs:///recovery

Thanks in advance,
Stefanos Antaris



Reply | Threaded
Open this post in threaded view
|

Re: flink yarn-session failure

rmetzger0
Hi Stefanos,

the pasted yarn-site.xml file looks fine on the first sight. You don't need a yarn-site.xml file for Namenodes or DataNodes, these belong to HDFS.
In YARN these components are called ResourceManager and NodeManager.

You can usually create one yarn-site.xml file and copy it to all machines.

I know the situation for tutorials how to set up YARN is not very good. I think most people use the installers of the big Hadoop vendors.





On Thu, Nov 19, 2015 at 12:44 PM, Stefanos Antaris <[hidden email]> wrote:
Yes. You are right. I cannot run any YARN application. However, i have no localhost in my yarn-site.xml. 

<configuration>
 <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>master:8025</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>master:8030</value>
    </property>
    <property>
        <name>yarn.resourcemanager.address</name>
        <value>master:8050</value>
    </property>
</configuration>


Could someone provide me a correct yarn-site.xml in order to make it work? Should the yarn-site.xml be the same in both namenode and datanodes? Sorry for this question but different tutorials on google refer to different configurations and i am confused.

Thanks,
Stefanos

On 19 Nov 2015, at 12:47, Robert Metzger <[hidden email]> wrote:

The exception is thrown even before Flink code is executed, so I assume that your YARN setup is not properly working.
Did you try running any other YARN application on the setup? I suspect that other systems like MapReduce or Spark will also not run on the environment.

Maybe the yarn-site.xml on the NodeManager hosts is not correct (pointing to localhost instead of the master)

On Thu, Nov 19, 2015 at 11:41 AM, Stefanos Antaris <[hidden email]> wrote:
Hi to all,

i am trying to use Flink with Hadoop yarn but i am facing an exception while trying to create a yarn-session.

First of all, i have a Hadoop cluster with 20 VMs that uses yarn. I can start the Hadoop cluster and run Hadoop jobs without any problem. Furthermore, i am trying to deploy a Flink cluster on the same VMs and use the Flink Yarn client. I have the HADOOP_HOME environmental variable set and the hadoop cluster up and running. When i execute the ./bin/yarn-session.sh -n 10 -tm 8192 -s 32 command i have the following exception. Can someone explain me how to solve this?

10:20:56,105 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master/192.168.0.194:8032
10:20:56,353 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:20:57,095 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager count = 10
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  JobManager memory = 1024
10:20:57,097 INFO  org.apache.flink.yarn.FlinkYarnClient                         -  TaskManager memory = 2048
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - This YARN session requires 21504MB of memory in the cluster. There are currently only 8192MB available.
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (3/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,365 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (4/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (5/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (6/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (7/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (8/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:57,366 WARN  org.apache.flink.yarn.FlinkYarnClient                         - There is not enough memory available in the YARN cluster. The TaskManager(s) require 2048MB each. NodeManagers available: [8192]
After allocating the JobManager (1024MB) and (9/10) TaskManagers, the following NodeManagers are available: [1024]
The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
10:20:58,204 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-dist-0.10.0.jar
10:21:00,235 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hduser/flink-0.10.0/conf/flink-conf.yaml to hdfs://master:<a href="tel:54310" value="+4954310" target="_blank">54310/user/hduser/.flink/application_1447928096470_0002/flink-conf.yaml
10:21:00,277 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/log4j-1.2.17.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/log4j-1.2.17.jar
10:21:00,349 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/slf4j-log4j12-1.7.7.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/slf4j-log4j12-1.7.7.jar
10:21:00,400 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/lib/flink-python-0.10.0.jar to hdfs://master:54310/user/hduser/.flink/application_1447928096470_0002/flink-python-0.10.0.jar
10:21:00,441 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/logback.xml to hdfs://master:<a href="tel:54310" value="+4954310" target="_blank">54310/user/hduser/.flink/application_1447928096470_0002/logback.xml
10:21:00,486 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hduser/flink-0.10.0/conf/log4j.properties to hdfs://master:<a href="tel:54310" value="+4954310" target="_blank">54310/user/hduser/.flink/application_1447928096470_0002/log4j.properties
10:21:00,553 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1447928096470_0002
10:21:00,963 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1447928096470_0002
10:21:00,964 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
10:21:00,969 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:01,973 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:02,977 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:03,982 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:04,986 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:05,990 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:06,994 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:07,996 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:09,003 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:10,007 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
10:21:11,011 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
Error while deploying YARN cluster: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
org.apache.flink.yarn.FlinkYarnClientBase$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1447928096470_0002 failed 1 times due to Error launching appattempt_1447928096470_0002_000001. Got exception: java.net.ConnectException: Call From flink-master/127.0.0.1 to localhost:38425 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.startContainers(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ContainerManagementProtocolPBClientImpl.startContainers(ContainerManagementProtocolPBClientImpl.java:96)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.launch(AMLauncher.java:119)
at org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher.run(AMLauncher.java:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
at org.apache.hadoop.ipc.Client.call(Client.java:1446)
... 9 more
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1447928096470_0002
at org.apache.flink.yarn.FlinkYarnClientBase.deployInternal(FlinkYarnClientBase.java:646)
at org.apache.flink.yarn.FlinkYarnClientBase.deploy(FlinkYarnClientBase.java:338)
at org.apache.flink.client.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:409)
at org.apache.flink.client.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:351)



Just to mention that my link-conf.yaml is the following : 
#==============================================================================
# Common
#==============================================================================

# The host on which the JobManager runs. Only used in non-high-availability mode.
# The JobManager process will use this hostname to bind the listening servers to.
# The TaskManagers will try to connect to the JobManager on that host.

jobmanager.rpc.address: master


# The port where the JobManager's main actor system listens for messages.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM

jobmanager.heap.mb: 256


# The heap size for the TaskManager JVM

taskmanager.heap.mb: 512


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 10


# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 5


#==============================================================================
# Web Frontend
#==============================================================================

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081


# The port uder which the standalone web client
# (for job upload and submit) listens.

webclient.port: 8080


#==============================================================================
# Streaming state checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if 
# checkpointing is enabled. 
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory> 
#
#state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
# state.backend.fs.checkpointdir: hdfs://namenode-host:port/flink-checkpoints


#==============================================================================
# Advanced
#==============================================================================

# The number of buffers for the network stack.
#
# taskmanager.network.numberOfBuffers: 2048


# Directories for temporary files.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# If not specified, the system-specific Java temporary directory (java.io.tmpdir
# property) is taken.
#
# taskmanager.tmp.dirs: /tmp


# Path to the Hadoop configuration directory.
#
# This configuration is used when writing into HDFS. Unless specified otherwise,
# HDFS file creation will use HDFS default settings with respect to block-size,
# replication factor, etc.
#
# You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop/


#==============================================================================
# Master High Availability (required configuration)
#==============================================================================

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
# recovery.mode: zookeeper
#
# recovery.zookeeper.quorum: localhost:2181,...
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
# recovery.zookeeper.storageDir: hdfs:///recovery

Thanks in advance,
Stefanos Antaris