Issues while restarting a job on HA cluster

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Issues while restarting a job on HA cluster

ani.desh1512
1. We have a HA cluster of 2 masters and 3 slaves. We run a jar through flink cli. Then we cancel that running job. Then we do some changes in the source code of jar, repackage it and deploy it again and run it again through cli. The following error occurs:
               

 java.lang.LinkageError: loader constraint violation: when resolving method "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I" the class loader (instance of                                     org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of the current class, com/mapr/fs/Inode, and the class loader (instance of sun/misc/Launcher$ExtClassLoader) for the method's defining class, com/mapr/fs/jni/MapRTableTools, have different Class objects for the type com/mapr/fs/jni/MapRUpdateAndGet used in the signature
        at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
        at com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
        at com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736)
        at com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
        at com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
        at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)

01/24/2017 19:15:50     Job execution switched to status FAILING.
java.lang.LinkageError: loader constraint violation: when resolving method "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of the current class, com/mapr/fs/Inode, and the class loader (instance of sun/misc/Launcher$ExtClassLoader) for the method's defining class, com/mapr/fs/jni/MapRTableTools, have different Class objects for the type com/mapr/fs/jni/MapRUpdateAndGet used in the signature
        at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
        at com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
        at com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736)
        at com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
        at com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
        at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)


    This error disappears when you restart the cluster.
The jar is basically tasked with reading from mapr-streams and dumping the messages into maprdb sink. Would this error be caused by some temp files that do not get cleared when I cancel the job?
I checked this thread [ thread ]. It said stop is more gracious way of stopping jobs. But, I guess it is not yet supported for Kafka source. I do get the following error when I try to stop my job:

        java.lang.Exception: Stopping the job with ID 3bf393c79dc5597c1053fc934a0cfc44 failed.
     at org.apache.flink.client.CliFrontend.stop(CliFrontend.java:525)
     at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1014)
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.IllegalStateException: Job with ID 3bf393c79dc5597c1053fc934a0cfc44 is not stoppable.
     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:580)
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
     at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
     at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
     at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
     at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
     at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Hence, I had to resort to cancel a job.
Will I always need to restart my flink cluster in order to resolve this error? Or am I missing some vital configuration?
Reply | Threaded
Open this post in threaded view
|

Re: Issues while restarting a job on HA cluster

rmetzger0
Hi Ani,

This error is independent of cancel vs stop. Its an issue of loading the MapR classes from the classloaders.

Do you user jars contain any MapR code (either mapr streams or maprdb)?

If so, I would recommend you to put these MapR libraries into the "lib/" folder of Flink. They'll then be deployed into the system classloader of the Flink JVMs.

Regards,
Robert


On Wed, Jan 25, 2017 at 5:10 PM, ani.desh1512 <[hidden email]> wrote:
1. We have a HA cluster of 2 masters and 3 slaves. We run a jar through flink
cli. Then we cancel that running job. Then we do some changes in the source
code of jar, repackage it and deploy it again and run it again through cli.
The following error occurs:


/ java.lang.LinkageError: loader constraint violation: when resolving method
"com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
the class loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of
the current class, com/mapr/fs/Inode, and the class loader (instance of
sun/misc/Launcher$ExtClassLoader) for the method's defining class,
com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
com/mapr/fs/jni/MapRUpdateAndGet used in the signature
        at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
        at
com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
        at
com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736)
        at
com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
        at
com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
        at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)

01/24/2017 19:15:50     Job execution switched to status FAILING.
java.lang.LinkageError: loader constraint violation: when resolving method
"com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
the class loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of
the current class, com/mapr/fs/Inode, and the class loader (instance of
sun/misc/Launcher$ExtClassLoader) for the method's defining class,
com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
com/mapr/fs/jni/MapRUpdateAndGet used in the signature
        at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
        at
com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
        at
com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736)
        at
com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
        at
com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
        at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)/

    This error disappears when you restart the cluster.
The jar is basically tasked with reading from mapr-streams and dumping the
messages into maprdb sink. Would this error be caused by some temp files
that do not get cleared when I cancel the job?
I checked this thread [  thread
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Diff-between-stop-and-cancel-job-td6697.html>
]. It said stop is more gracious way of stopping jobs. But, I guess it is
not yet supported for Kafka source. I do get the following error when I try
to stop my job:

      /  java.lang.Exception: Stopping the job with ID
3bf393c79dc5597c1053fc934a0cfc44 failed.
     at org.apache.flink.client.CliFrontend.stop(CliFrontend.java:525)
     at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1014)
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.IllegalStateException: Job with ID
3bf393c79dc5597c1053fc934a0cfc44 is not stoppable.
     at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:580)
     at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
     at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
     at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
     at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
     at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
     at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
     at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)/

Hence, I had to resort to cancel a job.
Will I always need to restart my flink cluster in order to resolve this
error? Or am I missing some vital configuration?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-while-restarting-a-job-on-HA-cluster-tp11294.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Issues while restarting a job on HA cluster

ani.desh1512
Hi Robert,
Thanks for the answer.
My code does actually contain both mapr streams and maprdb jars. here are the steps I followed based on your suggestion:
1. I copied only the mapr-streams-*.jar and maprdb*.jar.
2. Then I tried to run my jar, but i got java.lang.noclassdeffounderror for some maprfs class.
3. I added maprfs*.jar to lib and tried submitting my jar again.
4. This time I got java.lang.noclassdeffounderror for some hadoopfs class.
5. At this point I just created a sym link in lib folder to point to the mapr lib folder, basically entailing that ALL the mapr related jars will be deployed into the system classloader.
6. This previous step did the trick and I was able to get my job running. Also, I have not yet encountered the error that I had earlier mentioned, once I cancelled and resubmitted the job.

My only question is: Is this the expected behavior and normal solution? Do we really need to add ALL the jars? I can possibly nitpick which jar to copy by using dependency tree, but to do that for all the jobs feels cumbersome.