Error running Flink job in Yarn-cluster mode

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Error running Flink job in Yarn-cluster mode

Biplob Biswas
I am trying to run a flink job on our cluster with 3 dn and 1 nn. I am usng the following command line argument to run this job, but I get an exception saying "Could not connect to the leading JobManager. Please check that the JobManager is running" ... what could I be doing wrong?
Surprisingly, on yarn UI, i shows that the job finished and it succeeded, which is completely false.

Can anyone shed some light on this?

 ./bin/flink run -m yarn-cluster -yn 4 -yt ./lib -c CEPForBAMOther ~/flinkCEPJob/FlinkCEP-1.0-SNAPSHOT.jar
2017-06-08 14:58:45,247 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl     - Timeline service address: http://airpluspoc-hdp-mn1.germanycentral.cloudapp.microsoftazure.de:8188/ws/v1/timeline/
2017-06-08 14:58:45,372 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at airpluspoc-hdp-mn1.germanycentral.cloudapp.microsoftazure.de/172.16.6.31:8050
2017-06-08 14:58:45,883 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2017-06-08 14:58:51,296 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1496901009273_0048
Cluster started: Yarn cluster with application id application_1496901009273_0048
Using address airpluspoc-hdp-dn2.germanycentral.cloudapp.microsoftazure.de:36661 to connect to JobManager.
JobManager web interface address http://airpluspoc-hdp-mn1.germanycentral.cloudapp.microsoftazure.de:8088/proxy/application_1496901009273_0048/
Using the parallelism provided by the remote cluster (4). To use another parallelism, set it at the ./bin/flink client.
Starting execution of program
RecordReadEventType
Test String
Waiting until all TaskManagers have connected

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Unable to get ClusterClient status from Application Client
        at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
        at org.apache.flink.yarn.YarnClusterClient.waitForClusterToBeReady(YarnClusterClient.java:507)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:454)
        at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:205)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:71)
        at CEPForBAMOther.main(CEPForBAMOther.java:134)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        ... 13 more
Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running.
        at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:789)
        at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:237)
        ... 24 more
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway.
        at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79)
        at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:784)
        ... 25 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at scala.concurrent.Await.result(package.scala)
        at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77)
        ... 26 more



Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Error running Flink job in Yarn-cluster mode

Nico Kruber
I'm no expert here, but are 4 yarn containers/task managers (-yn 4) not too
many for 3 data nodes (=3 dn?)?

also, isn't the YARN UI reflecting its own jobs, i.e. running flink, as opposed
to running the actual flink job? or did you mean that the flink web ui (through
yarn) showed the submitted job as succeeded?


Nico

On Thursday, 8 June 2017 16:49:37 CEST Biplob Biswas wrote:

> I am trying to run a flink job on our cluster with 3 dn and 1 nn. I am usng
> the following command line argument to run this job, but I get an exception
> saying "Could not connect to the leading JobManager. Please check that the
> JobManager is running" ... what could I be doing wrong?
> Surprisingly, on yarn UI, i shows that the job finished and it succeeded,
> which is completely false.
>
> Can anyone shed some light on this?
>
>  ./bin/flink run -m yarn-cluster -yn 4 -yt ./lib -c CEPForBAMOther
> ~/flinkCEPJob/FlinkCEP-1.0-SNAPSHOT.jar
> 2017-06-08 14:58:45,247 INFO
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl     - Timeline
> service address:
> http://airpluspoc-hdp-mn1.germanycentral.cloudapp.microsoftazure.de:8188/ws/
> v1/timeline/ 2017-06-08 14:58:45,372 INFO
> org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at
> airpluspoc-hdp-mn1.germanycentral.cloudapp.microsoftazure.de/172.16.6.31:805
> 0 2017-06-08 14:58:45,883 WARN
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The
> short-circuit local reads feature cannot be used because libhadoop cannot be
> loaded.
> 2017-06-08 14:58:51,296 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted
> application application_1496901009273_0048
> Cluster started: Yarn cluster with application id
> application_1496901009273_0048
> Using address
> airpluspoc-hdp-dn2.germanycentral.cloudapp.microsoftazure.de:36661 to
> connect to JobManager.
> JobManager web interface address
> http://airpluspoc-hdp-mn1.germanycentral.cloudapp.microsoftazure.de:8088/pro
> xy/application_1496901009273_0048/ Using the parallelism provided by the
> remote cluster (4). To use another parallelism, set it at the ./bin/flink
> client.
> Starting execution of program
> RecordReadEventType
> Test String
> Waiting until all TaskManagers have connected
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgr
> am.java:545) at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExec
> ution(PackagedProgram.java:419) at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
>         at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
>         at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130) at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurity
> Context.java:43) at java.security.AccessController.doPrivileged(Native
> Method) at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.ja
> va:1657) at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSec
> urityContext.java:40) at
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1129) Caused by:
> java.lang.RuntimeException: Unable to get ClusterClient status from
> Application Client
>         at
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.j
> ava:243) at
> org.apache.flink.yarn.YarnClusterClient.waitForClusterToBeReady(YarnClusterC
> lient.java:507) at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:454)
>         at
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:205
> ) at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>         at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(
> StreamContextEnvironment.java:71) at
> CEPForBAMOther.main(CEPForBAMOther.java:134)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
> ) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
> .java:43) at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgr
> am.java:528) ... 13 more
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the
> leading JobManager. Please check that the JobManager is running.
>         at
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterCl
> ient.java:789) at
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.j
> ava:237) ... 24 more
> Caused by:
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not
> retrieve the leader gateway.
>         at
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(Lea
> derRetrievalUtils.java:79) at
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterCl
> ient.java:784) ... 25 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10000 milliseconds]
>         at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scal
> a:53) at scala.concurrent.Await$.result(package.scala:190)
>         at scala.concurrent.Await.result(package.scala)
>         at
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(Lea
> derRetrievalUtils.java:77) ... 26 more
>
>
>
>
> Regards,
> Biplob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-r
> unning-Flink-job-in-Yarn-cluster-mode-tp13593.html Sent from the Apache
> Flink User Mailing List archive. mailing list archive at Nabble.com.


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Error running Flink job in Yarn-cluster mode

Biplob Biswas
Hi Nico,

I tried running my job with 3 and even 2 yarn containers and the result is the same. Then I tried running the example wordcount(streaming and batch both) and they seem to find the task and job managers and run succesfully.

./bin/flink run -m yarn-cluster -yn 3 -yt ./lib ./examples/streaming/WordCount.jar --> works
./bin/flink run -m yarn-cluster -yn 3 -yt ./lib -c CEPForBAMOther ~/flinkCEPJob/FlinkCEP-1.0-SNAPSHOT.jar --> doesn't work

anyone has any inputs here?

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Error running Flink job in Yarn-cluster mode

Biplob Biswas
One more thing i noticed is that the streaming wordcount from the flink package works when i run it but when i used the same github code, packaged it and uploaded the fat jar with the word count example to the cluster, i get the same error.

I am wondering, How can making my uber jar produce such erroneous result when i am basically not changing anything else.
Reply | Threaded
Open this post in threaded view
|

Re: Error running Flink job in Yarn-cluster mode

Aljoscha Krettek
Hi,

What version of Flink are you using? Also, how are you building your job? Is it a fat-jar, maybe based in the Flink Quickstart project?

Best,
Aljoscha

> On 9. Jun 2017, at 11:49, Biplob Biswas <[hidden email]> wrote:
>
> One more thing i noticed is that the streaming wordcount from the flink
> package works when i run it but when i used the same github code, packaged
> it and uploaded the fat jar with the word count example to the cluster, i
> get the same error.
>
> I am wondering, How can making my uber jar produce such erroneous result
> when i am basically not changing anything else.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-running-Flink-job-in-Yarn-cluster-mode-tp13593p13607.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Error running Flink job in Yarn-cluster mode

Biplob Biswas
Thanks a lot Aljoscha (again)

I created my project from scratch and used the flink-maven-archetype and now it works on the yarn-cluster mode. I was creating a fat jar initially as well with my old project setup so not really sure what went wrong there as it was working on my local test environment.

You have been a life-saver so many times now. :)

Thanks & Regards
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Error running Flink job in Yarn-cluster mode

Aljoscha Krettek
You’re welcome. 😃

> On 13. Jun 2017, at 10:24, Biplob Biswas <[hidden email]> wrote:
>
> Thanks a lot Aljoscha (again)
>
> I created my project from scratch and used the flink-maven-archetype and now
> it works on the yarn-cluster mode. I was creating a fat jar initially as
> well with my old project setup so not really sure what went wrong there as
> it was working on my local test environment.
>
> You have been a life-saver so many times now. :)
>
> Thanks & Regards
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-running-Flink-job-in-Yarn-cluster-mode-tp13593p13676.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.