Why do I get this error when instantiating an akka ActorSystem for the Flink-JobManager?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Why do I get this error when instantiating an akka ActorSystem for the Flink-JobManager?

Felipe Gutierrez
Hi all,

I have a PoC running on Flink where I instantiate a thread to run
asynchronously on the JobManager. For this I am using the class
JobManagerRunnerImpl [1] (at flink-runtime package) to create and
start this thread.

I want to replace this thread by an akka-remote process since Flink is
already using this lib to exchange messages between JM and TMs. When I
use any of the two lines to create the Akka ActorSystem on this class
I get an error.

ActorSystem system = ActorSystem.create("mySystem", config);
ActorSystem system = AkkaUtils.createActorSystem("mySystem", config);

The error is java.lang.ClassNotFoundException:
org.agrona.collections.Long2ObjectHashMap. I search this class and
this package (agrona) is from aeron-udp. However I am using tcp, with
this configuration to create the ActorSystem:

akka {
  actor {
    provider = remote
    allow-java-serialization = on
  }

  remote {
    use-unsafe-remote-features-outside-cluster = true
    artery {
      enabled = on
      transport = tcp
      canonical.hostname = "localhost"
      canonical.port = 2552
    }
  }
}

What can be wrong to instantiate an ActorSystem inside JobManagerRunnerImpl?

Thanks,
Felipe

The full stack trace:
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:966)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoClassDefFoundError:
org/agrona/collections/Long2ObjectHashMap
at akka.remote.artery.compress.InboundCompressionsImpl.<init>(InboundCompressions.scala:60)
at akka.remote.artery.ArteryTransport.<init>(ArteryTransport.scala:332)
at akka.remote.artery.tcp.ArteryTcpTransport.<init>(ArteryTcpTransport.scala:67)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:211)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:874)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:870)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:870)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:891)
at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:166)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:82)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$4(Dispatcher.java:426)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
org.agrona.collections.Long2ObjectHashMap
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 20 more

[1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com