Hi,
Is my-machine:52650 the correct address for the JobManager running in YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when you use YARN with HA mode.
Best,
Aljoscha
Hi,
I am running a Flink Job which uses the Queryable State feature of Apache Flink(1.3.2). I was able to do that in local mode. When I try to do that in Cluster mode (Yarn Session), I am getting Actor not found Exception.
Please help me to understand what is missing.
Exception Trace
Query failed because of the following Exception:
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.<a href="tcp://flink@my-machine:52650/" class="">tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
Client Creation Snippet
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
.createHighAvailabilityServices(config, Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
this.client = new QueryableStateClient(config, highAvailabilityServices);