Flink CLI cannot submit job to Flink on Mesos

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CLI cannot submit job to Flink on Mesos

Francisco Gonzalez
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [<a href="akka.tcp://flink@10.203.23.24:24469" class="">akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [<a href="akka.tcp://flink@10.203.23.24:24469" class="">akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339

Reply | Threaded
Open this post in threaded view
|

Re: Flink CLI cannot submit job to Flink on Mesos

Till Rohrmann
Hi Francisco,

have you set the right high-availability configuration options in your client configuration as described here [1]? If not, then Flink is not able to find the correct JobManager because it retrieves the address as well as a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339


Reply | Threaded
Open this post in threaded view
|

Re: Flink CLI cannot submit job to Flink on Mesos

Francisco Gonzalez
Hi Till,

Thanks for your answer.

We have reviewed the configuration and everything seems fine in our side…  But we´re still getting the message:

“Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 041b67c7ef765c2f61bd69c2b9dacbce),DETACHED)) because the expected leader session ID 9e9e4e4b-1236-4140-9156-fd207929aab5 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.”

The point is we have another configuration using Flink 1.1.3 on YARN, and it´s working cool. And if I take a look at the configuration values, the main difference I can see (apart from mesos/yarn config parameters) is that in yarn the jobmanager.rpc.address is an ip and on mesos it´s a hostname. Might this be related?

Thanks in advance.


On 28 Jul 2017, at 11:07, Till Rohrmann <[hidden email]> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client configuration as described here [1]? If not, then Flink is not able to find the correct JobManager because it retrieves the address as well as a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339



Reply | Threaded
Open this post in threaded view
|

Re: Flink CLI cannot submit job to Flink on Mesos

Francisco Gonzalez
In reply to this post by Till Rohrmann
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  ${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann <[hidden email]> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client configuration as described here [1]? If not, then Flink is not able to find the correct JobManager because it retrieves the address as well as a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339



Reply | Threaded
Open this post in threaded view
|

Re: Flink CLI cannot submit job to Flink on Mesos

Stephan Ewen
Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up that address automatically from ZooKeeper as well (together with the HA leader session ID).

Please check if you have the ZooKeeper HA config entries in the config used by the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  ${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann <[hidden email]> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client configuration as described here [1]? If not, then Flink is not able to find the correct JobManager because it retrieves the address as well as a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339




Reply | Threaded
Open this post in threaded view
|

Re: Flink CLI cannot submit job to Flink on Mesos

Francisco Gonzalez
In reply to this post by Francisco Gonzalez
Hi Stephan,

So, do you mean to remove the “-m” param from the flink CLI call? And on the other hand, that I should add the Zookeeper configuration in both sides, the remote flink and locally in the flink CLI config, right?

Regards


On 31 Jul 2017, at 22:21, Stephan Ewen <[hidden email]> wrote:

Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up that address automatically from ZooKeeper as well (together with the HA leader session ID).

Please check if you have the ZooKeeper HA config entries in the config used by the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  ${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann <[hidden email]> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client configuration as described here [1]? If not, then Flink is not able to find the correct JobManager because it retrieves the address as well as a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339





Reply | Threaded
Open this post in threaded view
|

Re: Flink CLI cannot submit job to Flink on Mesos

Francisco Gonzalez
Hey! It´s working now!! 

I will do a summary for those who might have the same problem in the future:

- Flink 1.3.0 dockerized on Mesos:

- Flink CLI 1.3.0 on my local machine (make sure you use the same version!!)


With those steps, my ./fink run command it´s working like a charm.

Thank you very much guys!

Regards,
Francisco




On 1 Aug 2017, at 10:24, Francisco Gonzalez Barea <[hidden email]> wrote:

Hi Stephan,

So, do you mean to remove the “-m” param from the flink CLI call? And on the other hand, that I should add the Zookeeper configuration in both sides, the remote flink and locally in the flink CLI config, right?

Regards


On 31 Jul 2017, at 22:21, Stephan Ewen <[hidden email]> wrote:

Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up that address automatically from ZooKeeper as well (together with the HA leader session ID).

Please check if you have the ZooKeeper HA config entries in the config used by the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  ${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann <[hidden email]> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client configuration as described here [1]? If not, then Flink is not able to find the correct JobManager because it retrieves the address as well as a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339






Reply | Threaded
Open this post in threaded view
|

Re: Flink CLI cannot submit job to Flink on Mesos

Stephan Ewen
Cool, good to hear!

It is one of those "it a feature, not a bug" situations ;-)

Flink's HA mode supports multiple masters, so the CLI needs to have a way to find which master is "leader" (active, versus the passive masters on standby). That discovery goes through ZooKeeper as well (which is the ground truth for who is the leader).

Stephan


On Tue, Aug 1, 2017 at 11:36 AM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hey! It´s working now!! 

I will do a summary for those who might have the same problem in the future:

- Flink 1.3.0 dockerized on Mesos:

- Flink CLI 1.3.0 on my local machine (make sure you use the same version!!)


With those steps, my ./fink run command it´s working like a charm.

Thank you very much guys!

Regards,
Francisco




On 1 Aug 2017, at 10:24, Francisco Gonzalez Barea <[hidden email]> wrote:

Hi Stephan,

So, do you mean to remove the “-m” param from the flink CLI call? And on the other hand, that I should add the Zookeeper configuration in both sides, the remote flink and locally in the flink CLI config, right?

Regards


On 31 Jul 2017, at 22:21, Stephan Ewen <[hidden email]> wrote:

Hi Francisco!

Can you drop the explicit address of the jobmanager? The client should pick up that address automatically from ZooKeeper as well (together with the HA leader session ID).

Please check if you have the ZooKeeper HA config entries in the config used by the CLI.

Stephan


On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port}  ${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann <[hidden email]> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your client configuration as described here [1]? If not, then Flink is not able to find the correct JobManager because it retrieves the address as well as a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <[hidden email]> wrote:
Hello,

We´re having lot of issues while trying to submit a job remotely using the Flink CLI command line tool. We have tried different configurations but in all of them we get errors from AKKA while trying to connect. I will try to summarise the configurations we´ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (using Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.e. kind of ip-XXXXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).

When I try to submit the job using the command flink run -m myHostName:myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after some time waiting I get the trace at the end of this email. In the flink side we get this error from AKKA: 

Association with remote system [akka.tcp://flink@10.203.23.24:24469] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there´re some problems related to akka resolving hostnames to ips, so we decided to startup the same flink but changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the email) from the client side and this one from the Flink server:

Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without success… any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:426)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:423)
... 16 more



This message is private and confidential. If you have received this message in error, please notify the sender or [hidden email] and remove it from your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, GA 30339