1. We have a HA cluster of 2 masters and 3 slaves. We run a jar through flink cli. Then we cancel that running job. Then we do some changes in the source code of jar, repackage it and deploy it again and run it again through cli. The following error occurs:
java.lang.LinkageError: loader constraint violation: when resolving method "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of the current class, com/mapr/fs/Inode, and the class loader (instance of sun/misc/Launcher$ExtClassLoader) for the method's defining class, com/mapr/fs/jni/MapRTableTools, have different Class objects for the type com/mapr/fs/jni/MapRUpdateAndGet used in the signature at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777) at com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799) at com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736) at com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366) at com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332) at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93) at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31) at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) 01/24/2017 19:15:50 Job execution switched to status FAILING. java.lang.LinkageError: loader constraint violation: when resolving method "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of the current class, com/mapr/fs/Inode, and the class loader (instance of sun/misc/Launcher$ExtClassLoader) for the method's defining class, com/mapr/fs/jni/MapRTableTools, have different Class objects for the type com/mapr/fs/jni/MapRUpdateAndGet used in the signature at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777) at com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799) at com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736) at com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366) at com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332) at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93) at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31) at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) This error disappears when you restart the cluster. The jar is basically tasked with reading from mapr-streams and dumping the messages into maprdb sink. Would this error be caused by some temp files that do not get cleared when I cancel the job? I checked this thread [ thread ]. It said stop is more gracious way of stopping jobs. But, I guess it is not yet supported for Kafka source. I do get the following error when I try to stop my job: java.lang.Exception: Stopping the job with ID 3bf393c79dc5597c1053fc934a0cfc44 failed. at org.apache.flink.client.CliFrontend.stop(CliFrontend.java:525) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1014) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: java.lang.IllegalStateException: Job with ID 3bf393c79dc5597c1053fc934a0cfc44 is not stoppable. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:580) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Hence, I had to resort to cancel a job. Will I always need to restart my flink cluster in order to resolve this error? Or am I missing some vital configuration? |
Hi Ani,
This error is independent of cancel vs stop. Its an issue of loading the MapR classes from the classloaders. Do you user jars contain any MapR code (either mapr streams or maprdb)? If so, I would recommend you to put these MapR libraries into the "lib/" folder of Flink. They'll then be deployed into the system classloader of the Flink JVMs. Regards, Robert On Wed, Jan 25, 2017 at 5:10 PM, ani.desh1512 <[hidden email]> wrote: 1. We have a HA cluster of 2 masters and 3 slaves. We run a jar through flink |
Hi Robert,
Thanks for the answer. My code does actually contain both mapr streams and maprdb jars. here are the steps I followed based on your suggestion: 1. I copied only the mapr-streams-*.jar and maprdb*.jar. 2. Then I tried to run my jar, but i got java.lang.noclassdeffounderror for some maprfs class. 3. I added maprfs*.jar to lib and tried submitting my jar again. 4. This time I got java.lang.noclassdeffounderror for some hadoopfs class. 5. At this point I just created a sym link in lib folder to point to the mapr lib folder, basically entailing that ALL the mapr related jars will be deployed into the system classloader. 6. This previous step did the trick and I was able to get my job running. Also, I have not yet encountered the error that I had earlier mentioned, once I cancelled and resubmitted the job. My only question is: Is this the expected behavior and normal solution? Do we really need to add ALL the jars? I can possibly nitpick which jar to copy by using dependency tree, but to do that for all the jobs feels cumbersome. |
Free forum by Nabble | Edit this page |