classpath issues with MapR tables on Flink

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

classpath issues with MapR tables on Flink

ebarsalou
I'm trying to run 2 jobs on a flink cluster; one of them writes to a MapR-DB binary table and the other writes to a MapR-DB json table.  Both of these jobs work fine, if I run them independently on a local cluster, but I can't run the job that writes to the binary table, unless I restart the local cluster after I run the job that writes to the json table.  Even if the job writing to the json table isn't currently running, I still have to restart the local cluster in order to reload the classpath.

I end up getting the same classpath issue when I run the binary table job on a standalone cluster, even if I restart the cluster between jobs.

The application I'm running is an uber jar, with the hbase-client mapr jar baked in, so BaseTableMappingRules is already in the application jar.

The issue I'm getting with the job that writes to the binary table (on the standalone cluster) is:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
        at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at testconsumer.TestConsumer$.main(TestConsumer.scala:45)
        at testconsumer.TestConsumer.main(TestConsumer.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules.
==> org/apache/hadoop/hbase/client/mapr/BaseTableMappingRules.
        at org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory.create(TableMappingRulesFactory.java:68)
        at org.apache.hadoop.hbase.client.HTable.initIfMapRTableImpl(HTable.java:475)
        at org.apache.hadoop.hbase.client.HTable.initIfMapRTable(HTable.java:443)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:194)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:161)
        at hbase.HBaseRequests.configure(HBaseRequests.scala:34)
        at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:57)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error occurred while instantiating com.mapr.fs.hbase.MapRTableMappingRules.
==> org/apache/hadoop/hbase/client/mapr/BaseTableMappingRules.
        at org.apache.hadoop.hbase.client.mapr.GenericHFactory.getImplementorInstance(GenericHFactory.java:40)
        at org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory.create(TableMappingRulesFactory.java:50)
        ... 12 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/mapr/BaseTableMappingRules
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.hadoop.hbase.client.mapr.GenericHFactory.getImplementorInstance(GenericHFactory.java:30)
        ... 13 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.client.mapr.BaseTableMappingRules
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 29 more


Is there a way to resolve this issue so that both json & binary table applications can run on the same cluster without having this classpath issue w/ the binary table job?