accessing flink HA cluster with scala shell/zeppelin notebook

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

accessing flink HA cluster with scala shell/zeppelin notebook

Maciek Próchniak
Hi,

I have standalone Flink cluster configured with HA setting (i.e. with
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
notebook or scala shell?

There are settings for host/port, but with HA setting they are not fixed
- if I check which is *current leader* host and port and set that I get
exception on job manager:

20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
o.a.f.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
because the expected leader session ID
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
leader session ID None.

- I guess it's reasonable behaviour, since I should use appropriate
LeaderRetrievalService and so on. But apparently there's no such
possibility in scala flink shell?

Is it missing feature? I can prepare patch, but I'm not sure how would I
hook behaviour of ClusterClient into FlinkILoop?

thanks,

maciek

Reply | Threaded
Open this post in threaded view
|

Re: accessing flink HA cluster with scala shell/zeppelin notebook

Aljoscha Krettek
[hidden email], do you know what can be used to access a HA cluster from that setting.

Adding Till since he probably knows the HA stuff best.

On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <[hidden email]> wrote:
Hi,

I have standalone Flink cluster configured with HA setting (i.e. with
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
notebook or scala shell?

There are settings for host/port, but with HA setting they are not fixed
- if I check which is *current leader* host and port and set that I get
exception on job manager:

20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
o.a.f.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
because the expected leader session ID
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
leader session ID None.

- I guess it's reasonable behaviour, since I should use appropriate
LeaderRetrievalService and so on. But apparently there's no such
possibility in scala flink shell?

Is it missing feature? I can prepare patch, but I'm not sure how would I
hook behaviour of ClusterClient into FlinkILoop?

thanks,

maciek

Reply | Threaded
Open this post in threaded view
|

Re: accessing flink HA cluster with scala shell/zeppelin notebook

Alexis Gendronneau
Hello users,

As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one is running in High-availability mode.

When running jobs from Zeppelin in Flink local mode, everything works fine. But when trying to submit job to remote host (no matter which version involved), job is stuck in submitting phase until it reaches akka.client.timeout.

I tried to increase timeout (like said in error raised in zeppelin), but it only increase time before error is finally raised (tested with 600s).

On Flink side, nothing appears but :

    2017-03-20 11:19:31,675 WARN  org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
    because the expected leader session ID Some(f955760c-d80d-4992-a148-5968026ca6e4) did not equal the received leader session ID None.


On zepplin interpreter side, we get following stacktrace :

    bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount] =     org.apache.flink.api.scala.DataSet@669fc812
    org.apache.flink.client.program.ProgramInvocationException: The program     execution failed: Communication with JobManager failed: Job submission to the     JobManager timed out. You may increase 'akka.client.timeout' in case the     JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409)
      at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:369)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:344)
      at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
      at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
      at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
      at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637)
      at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
      ... 36 elided
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
      ... 46 more
    Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
      at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It looks like we have to add parameters on zepplin side, but I cant see whats missing here. Any clue appreciated.

Regards,

2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <[hidden email]>:
[hidden email], do you know what can be used to access a HA cluster from that setting.

Adding Till since he probably knows the HA stuff best.

On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <[hidden email]> wrote:
Hi,

I have standalone Flink cluster configured with HA setting (i.e. with
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
notebook or scala shell?

There are settings for host/port, but with HA setting they are not fixed
- if I check which is *current leader* host and port and set that I get
exception on job manager:

20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
o.a.f.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
because the expected leader session ID
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
leader session ID None.

- I guess it's reasonable behaviour, since I should use appropriate
LeaderRetrievalService and so on. But apparently there's no such
possibility in scala flink shell?

Is it missing feature? I can prepare patch, but I'm not sure how would I
hook behaviour of ClusterClient into FlinkILoop?

thanks,

maciek




--
Reply | Threaded
Open this post in threaded view
|

Re: accessing flink HA cluster with scala shell/zeppelin notebook

rmetzger0
Hi Alexis,

did you set the Zookeeper configuration for Flink in Zeppelin? 

On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau <[hidden email]> wrote:
Hello users,

As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one is running in High-availability mode.

When running jobs from Zeppelin in Flink local mode, everything works fine. But when trying to submit job to remote host (no matter which version involved), job is stuck in submitting phase until it reaches akka.client.timeout.

I tried to increase timeout (like said in error raised in zeppelin), but it only increase time before error is finally raised (tested with 600s).

On Flink side, nothing appears but :

    2017-03-20 11:19:31,675 WARN  org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
    because the expected leader session ID Some(f955760c-d80d-4992-a148-5968026ca6e4) did not equal the received leader session ID None.


On zepplin interpreter side, we get following stacktrace :

    bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount] =     org.apache.flink.api.scala.DataSet@669fc812
    org.apache.flink.client.program.ProgramInvocationException: The program     execution failed: Communication with JobManager failed: Job submission to the     JobManager timed out. You may increase 'akka.client.timeout' in case the     JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409)
      at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:369)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:344)
      at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
      at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
      at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
      at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637)
      at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
      ... 36 elided
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
      ... 46 more
    Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
      at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It looks like we have to add parameters on zepplin side, but I cant see whats missing here. Any clue appreciated.

Regards,

2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <[hidden email]>:
[hidden email], do you know what can be used to access a HA cluster from that setting.

Adding Till since he probably knows the HA stuff best.

On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <[hidden email]> wrote:
Hi,

I have standalone Flink cluster configured with HA setting (i.e. with
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
notebook or scala shell?

There are settings for host/port, but with HA setting they are not fixed
- if I check which is *current leader* host and port and set that I get
exception on job manager:

20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
o.a.f.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
because the expected leader session ID
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
leader session ID None.

- I guess it's reasonable behaviour, since I should use appropriate
LeaderRetrievalService and so on. But apparently there's no such
possibility in scala flink shell?

Is it missing feature? I can prepare patch, but I'm not sure how would I
hook behaviour of ClusterClient into FlinkILoop?

thanks,

maciek




--

Reply | Threaded
Open this post in threaded view
|

Re: accessing flink HA cluster with scala shell/zeppelin notebook

Till Rohrmann

Hi Maciek and Alexis,

as far as I can tell, I think it is currently not possible to use Zeppelin with a Flink cluster running in HA mode. In order to make it work, it would be necessary to specify either a Flink configuration for the Flink interpreter (this is probably the most general solution) or to enable the HA mode in Zeppelin. Enabling the HA mode would mean that we set high-availability: zookeeper in the configuration and then set all the remaining high-availability configuration options [1] to the same values with which the Flink cluster was started. This would have to be contributed to the Zeppelin project.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#high-availability-ha

Cheers,
Till


On Thu, Mar 23, 2017 at 11:41 AM, Robert Metzger <[hidden email]> wrote:
Hi Alexis,

did you set the Zookeeper configuration for Flink in Zeppelin? 

On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau <[hidden email]> wrote:
Hello users,

As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one is running in High-availability mode.

When running jobs from Zeppelin in Flink local mode, everything works fine. But when trying to submit job to remote host (no matter which version involved), job is stuck in submitting phase until it reaches akka.client.timeout.

I tried to increase timeout (like said in error raised in zeppelin), but it only increase time before error is finally raised (tested with 600s).

On Flink side, nothing appears but :

    2017-03-20 11:19:31,675 WARN  org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
    because the expected leader session ID Some(f955760c-d80d-4992-a148-5968026ca6e4) did not equal the received leader session ID None.


On zepplin interpreter side, we get following stacktrace :

    bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount] =     org.apache.flink.api.scala.DataSet@669fc812
    org.apache.flink.client.program.ProgramInvocationException: The program     execution failed: Communication with JobManager failed: Job submission to the     JobManager timed out. You may increase 'akka.client.timeout' in case the     JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409)
      at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:369)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:344)
      at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
      at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
      at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
      at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637)
      at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
      ... 36 elided
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
      ... 46 more
    Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
      at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It looks like we have to add parameters on zepplin side, but I cant see whats missing here. Any clue appreciated.

Regards,

2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <[hidden email]>:
[hidden email], do you know what can be used to access a HA cluster from that setting.

Adding Till since he probably knows the HA stuff best.

On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <[hidden email]> wrote:
Hi,

I have standalone Flink cluster configured with HA setting (i.e. with
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
notebook or scala shell?

There are settings for host/port, but with HA setting they are not fixed
- if I check which is *current leader* host and port and set that I get
exception on job manager:

20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
o.a.f.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
because the expected leader session ID
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
leader session ID None.

- I guess it's reasonable behaviour, since I should use appropriate
LeaderRetrievalService and so on. But apparently there's no such
possibility in scala flink shell?

Is it missing feature? I can prepare patch, but I'm not sure how would I
hook behaviour of ClusterClient into FlinkILoop?

thanks,

maciek




--


Reply | Threaded
Open this post in threaded view
|

Re: accessing flink HA cluster with scala shell/zeppelin notebook

Alexis Gendronneau
Hi Robert, Hi Till,

I tried to setup high-availibility options in zepplin, but i guess it's just a matter of flink version compatibility on zepplin side. I'll try to compile zepplin with 1.2 and add needed parameter to see if its better. Thanks for your help !

2017-03-27 15:09 GMT+02:00 Till Rohrmann <[hidden email]>:

Hi Maciek and Alexis,

as far as I can tell, I think it is currently not possible to use Zeppelin with a Flink cluster running in HA mode. In order to make it work, it would be necessary to specify either a Flink configuration for the Flink interpreter (this is probably the most general solution) or to enable the HA mode in Zeppelin. Enabling the HA mode would mean that we set high-availability: zookeeper in the configuration and then set all the remaining high-availability configuration options [1] to the same values with which the Flink cluster was started. This would have to be contributed to the Zeppelin project.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#high-availability-ha

Cheers,
Till


On Thu, Mar 23, 2017 at 11:41 AM, Robert Metzger <[hidden email]> wrote:
Hi Alexis,

did you set the Zookeeper configuration for Flink in Zeppelin? 

On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau <[hidden email]> wrote:
Hello users,

As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one is running in High-availability mode.

When running jobs from Zeppelin in Flink local mode, everything works fine. But when trying to submit job to remote host (no matter which version involved), job is stuck in submitting phase until it reaches akka.client.timeout.

I tried to increase timeout (like said in error raised in zeppelin), but it only increase time before error is finally raised (tested with 600s).

On Flink side, nothing appears but :

    2017-03-20 11:19:31,675 WARN  org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
    because the expected leader session ID Some(f955760c-d80d-4992-a148-5968026ca6e4) did not equal the received leader session ID None.


On zepplin interpreter side, we get following stacktrace :

    bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount] =     org.apache.flink.api.scala.DataSet@669fc812
    org.apache.flink.client.program.ProgramInvocationException: The program     execution failed: Communication with JobManager failed: Job submission to the     JobManager timed out. You may increase 'akka.client.timeout' in case the     JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409)
      at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:369)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:344)
      at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
      at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
      at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
      at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637)
      at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
      ... 36 elided
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
      at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
      ... 46 more
    Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
      at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
      at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
      at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It looks like we have to add parameters on zepplin side, but I cant see whats missing here. Any clue appreciated.

Regards,

2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <[hidden email]>:
[hidden email], do you know what can be used to access a HA cluster from that setting.

Adding Till since he probably knows the HA stuff best.

On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <[hidden email]> wrote:
Hi,

I have standalone Flink cluster configured with HA setting (i.e. with
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
notebook or scala shell?

There are settings for host/port, but with HA setting they are not fixed
- if I check which is *current leader* host and port and set that I get
exception on job manager:

20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
o.a.f.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
because the expected leader session ID
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
leader session ID None.

- I guess it's reasonable behaviour, since I should use appropriate
LeaderRetrievalService and so on. But apparently there's no such
possibility in scala flink shell?

Is it missing feature? I can prepare patch, but I'm not sure how would I
hook behaviour of ClusterClient into FlinkILoop?

thanks,

maciek




--





--
Reply | Threaded
Open this post in threaded view
|

Re: accessing flink HA cluster with scala shell/zeppelin notebook

santoshg
Hi Alexis,

Were you able to make this work ? I am also looking for zepplin integration
with Flink and this might be helpful.

Thanks
Santosh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/