Hi all,
I have a PoC running on Flink where I instantiate a thread to run asynchronously on the JobManager. For this I am using the class JobManagerRunnerImpl [1] (at flink-runtime package) to create and start this thread. I want to replace this thread by an akka-remote process since Flink is already using this lib to exchange messages between JM and TMs. When I use any of the two lines to create the Akka ActorSystem on this class I get an error. ActorSystem system = ActorSystem.create("mySystem", config); ActorSystem system = AkkaUtils.createActorSystem("mySystem", config); The error is java.lang.ClassNotFoundException: org.agrona.collections.Long2ObjectHashMap. I search this class and this package (agrona) is from aeron-udp. However I am using tcp, with this configuration to create the ActorSystem: akka { actor { provider = remote allow-java-serialization = on } remote { use-unsafe-remote-features-outside-cluster = true artery { enabled = on transport = tcp canonical.hostname = "localhost" canonical.port = 2552 } } } What can be wrong to instantiate an ActorSystem inside JobManagerRunnerImpl? Thanks, Felipe The full stack trace: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:966) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NoClassDefFoundError: org/agrona/collections/Long2ObjectHashMap at akka.remote.artery.compress.InboundCompressionsImpl.<init>(InboundCompressions.scala:60) at akka.remote.artery.ArteryTransport.<init>(ArteryTransport.scala:332) at akka.remote.artery.tcp.ArteryTcpTransport.<init>(ArteryTcpTransport.scala:67) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:211) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:874) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:870) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:870) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:891) at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96) at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70) at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55) at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125) at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:166) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:82) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$4(Dispatcher.java:426) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.agrona.collections.Long2ObjectHashMap at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 20 more [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com |
Free forum by Nabble | Edit this page |