Using port ranges to connect with the Flink Client

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Using port ranges to connect with the Flink Client

Gyula Fóra
Hi!

We have been running Flink on Yarn for quite some time and historically we specified port ranges so that the client can access the cluster:

yarn.application-master.port: 100-200

Now we updated to flink 1.7 and try to migrate away from the legacy execution mode but we run into a problem that we cannot connect to the running job from the command line client like before. 

What is the equivalent port config that would make sure that ports that are needed to be accessible from the client land between 100 and 200?

Thanks,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Using port ranges to connect with the Flink Client

Gyula Fóra
I get the following error when trying to savepoint a job for example:

 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running.
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:960)
at org.apache.flink.client.program.ClusterClient.triggerSavepoint(ClusterClient.java:737)
at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:771)
at org.apache.flink.client.cli.CliFrontend.lambda$checkpoint$10(CliFrontend.java:760)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1044)
at org.apache.flink.client.cli.CliFrontend.checkpoint(CliFrontend.java:759)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1127)
at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1188)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1188)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway.
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:83)
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:955)
... 12 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:81)
... 13 more

No error when trying the same operation with the 1.7 client on an 1.6 (legacy execution) job. This looks like a firewall issue so im trying to fix the ports to the open ranges but not sure what I have to change.

Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2018. dec. 4., K, 15:11):
Hi!

We have been running Flink on Yarn for quite some time and historically we specified port ranges so that the client can access the cluster:

yarn.application-master.port: 100-200

Now we updated to flink 1.7 and try to migrate away from the legacy execution mode but we run into a problem that we cannot connect to the running job from the command line client like before. 

What is the equivalent port config that would make sure that ports that are needed to be accessible from the client land between 100 and 200?

Thanks,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Using port ranges to connect with the Flink Client

Gyula Fóra
Ah, it seems to be something with the custom flink client build that we run...

Still dont know why but if I use the normal client once the job is started it works.

Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2018. dec. 5., Sze, 9:50):
I get the following error when trying to savepoint a job for example:

 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running.
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:960)
at org.apache.flink.client.program.ClusterClient.triggerSavepoint(ClusterClient.java:737)
at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:771)
at org.apache.flink.client.cli.CliFrontend.lambda$checkpoint$10(CliFrontend.java:760)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1044)
at org.apache.flink.client.cli.CliFrontend.checkpoint(CliFrontend.java:759)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1127)
at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1188)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1188)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway.
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:83)
at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:955)
... 12 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:81)
... 13 more

No error when trying the same operation with the 1.7 client on an 1.6 (legacy execution) job. This looks like a firewall issue so im trying to fix the ports to the open ranges but not sure what I have to change.

Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2018. dec. 4., K, 15:11):
Hi!

We have been running Flink on Yarn for quite some time and historically we specified port ranges so that the client can access the cluster:

yarn.application-master.port: 100-200

Now we updated to flink 1.7 and try to migrate away from the legacy execution mode but we run into a problem that we cannot connect to the running job from the command line client like before. 

What is the equivalent port config that would make sure that ports that are needed to be accessible from the client land between 100 and 200?

Thanks,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Using port ranges to connect with the Flink Client

Joshua Fan
Hi, Gyula

I met a similar situation.

We used flink 1.4 before, and everything is ok.

Now, we upgrade to flink 1.7 and use non-legacy mode, there seems something not ok, it all refers to that it is impossible get the jobmanagerGateway at client side. When I create a cluster without a job, I describe the cluster, flink will throw the same exception as you pointed out. When I submit a job, I want to trigger a savepoint at client side, it will also throw the same exception.

Don't know why in non-legacy mode,flink will not write back the leader info into zookeeper in the path of  /flink/app_99999_000/leader/000000000/job _manager_lock. This causes all the operations fail when using the jobmanagerGateway method in ClusterClient.

Hope someone can explain how to do this in a non-legacy mode.

Yours sincerely
Joshua
Reply | Threaded
Open this post in threaded view
|

Re: Using port ranges to connect with the Flink Client

Chesnay Schepler
@Gyula: From what I can tell your custom client is still relying on
akka, and should be using the RestClusterClient instead.

@Joshua: Are you by change using the ClusterClient directly? Unless
you're working with legacy clusters, for 1.5+ you should use the
RestClusterClient instead.

On 03.01.2019 08:32, Joshua Fan wrote:

> Hi, Gyula
>
> I met a similar situation.
>
> We used flink 1.4 before, and everything is ok.
>
> Now, we upgrade to flink 1.7 and use non-legacy mode, there seems
> something not ok, it all refers to that it is impossible get the
> jobmanagerGateway at client side. When I create a cluster without a
> job, I describe the cluster, flink will throw the same exception as
> you pointed out. When I submit a job, I want to trigger a savepoint at
> client side, it will also throw the same exception.
>
> Don't know why in non-legacy mode,flink will not write back the leader
> info into zookeeper in the path of
> /flink/app_99999_000/leader/000000000/job _manager_lock. This causes
> all the operations fail when using the jobmanagerGateway method in
> ClusterClient.
>
> Hope someone can explain how to do this in a non-legacy mode.
>
> Yours sincerely
> Joshua


Reply | Threaded
Open this post in threaded view
|

Re: Using port ranges to connect with the Flink Client

Gyula Fóra
Hi,

Thanks Chesnay my problem was fixed it was related to enabling port ranges for the rest client it turned out.

Gyula

On Fri, 4 Jan 2019 at 10:26, Chesnay Schepler <[hidden email]> wrote:
@Gyula: From what I can tell your custom client is still relying on
akka, and should be using the RestClusterClient instead.

@Joshua: Are you by change using the ClusterClient directly? Unless
you're working with legacy clusters, for 1.5+ you should use the
RestClusterClient instead.

On 03.01.2019 08:32, Joshua Fan wrote:
> Hi, Gyula
>
> I met a similar situation.
>
> We used flink 1.4 before, and everything is ok.
>
> Now, we upgrade to flink 1.7 and use non-legacy mode, there seems
> something not ok, it all refers to that it is impossible get the
> jobmanagerGateway at client side. When I create a cluster without a
> job, I describe the cluster, flink will throw the same exception as
> you pointed out. When I submit a job, I want to trigger a savepoint at
> client side, it will also throw the same exception.
>
> Don't know why in non-legacy mode,flink will not write back the leader
> info into zookeeper in the path of
> /flink/app_99999_000/leader/000000000/job _manager_lock. This causes
> all the operations fail when using the jobmanagerGateway method in
> ClusterClient.
>
> Hope someone can explain how to do this in a non-legacy mode.
>
> Yours sincerely
> Joshua


Reply | Threaded
Open this post in threaded view
|

Re: Using port ranges to connect with the Flink Client

Joshua Fan
Hi Chesnay

Yes, RestClusterClient is used in our company when using flink 1.7. It can do almost everything except to  get the ClusterOverview when I want to get summary information on a session cluster. Finally, I manually trigger a http get request to the cluster to do that. If RestClusterClient can provide a similar interface will be good.

Yours 
Joshua

On Fri, Jan 4, 2019 at 5:28 PM Gyula Fóra <[hidden email]> wrote:
Hi,

Thanks Chesnay my problem was fixed it was related to enabling port ranges for the rest client it turned out.

Gyula

On Fri, 4 Jan 2019 at 10:26, Chesnay Schepler <[hidden email]> wrote:
@Gyula: From what I can tell your custom client is still relying on
akka, and should be using the RestClusterClient instead.

@Joshua: Are you by change using the ClusterClient directly? Unless
you're working with legacy clusters, for 1.5+ you should use the
RestClusterClient instead.

On 03.01.2019 08:32, Joshua Fan wrote:
> Hi, Gyula
>
> I met a similar situation.
>
> We used flink 1.4 before, and everything is ok.
>
> Now, we upgrade to flink 1.7 and use non-legacy mode, there seems
> something not ok, it all refers to that it is impossible get the
> jobmanagerGateway at client side. When I create a cluster without a
> job, I describe the cluster, flink will throw the same exception as
> you pointed out. When I submit a job, I want to trigger a savepoint at
> client side, it will also throw the same exception.
>
> Don't know why in non-legacy mode,flink will not write back the leader
> info into zookeeper in the path of
> /flink/app_99999_000/leader/000000000/job _manager_lock. This causes
> all the operations fail when using the jobmanagerGateway method in
> ClusterClient.
>
> Hope someone can explain how to do this in a non-legacy mode.
>
> Yours sincerely
> Joshua