Re: Classloader issue using AvroParquetInputFormat via HadoopInputFormat

Posted by Ufuk Celebi on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Classloader-issue-using-AvroParquetInputFormat-via-HadoopInputFormat-tp8381p8402.html

I've started a vote for 1.1.1 containing hopefully fixed artifacts. If
you have any spare time, would you mind checking whether it fixes your
problem?

The artifacts are here: http://home.apache.org/~uce/flink-1.1.1-rc1/

You would have to add the following repository to your Maven project
and update the Flink version to 1.1.1:

<repositories>
<repository>
<id>flink-rc</id>
<name>flink-rc</name>
<url>https://repository.apache.org/content/repositories/orgapacheflink-1101</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

Would really appreciate it!

On Tue, Aug 9, 2016 at 2:11 PM, Ufuk Celebi <[hidden email]> wrote:

> This is a problem with the Maven artifacts of 1.1.0 :-( I've added a
> warning to the release note and will start a emergency vote for 1.1.1
> which only updates the Maven artifacts.
>
> On Tue, Aug 9, 2016 at 1:20 PM, LINZ, Arnaud <[hidden email]> wrote:
>> Okay,
>>
>> That would also solve my issue.
>>
>> Greetings,
>>
>> Arnaud
>>
>>
>>
>> De : Stephan Ewen [mailto:[hidden email]]
>> Envoyé : mardi 9 août 2016 12:41
>> À : [hidden email]
>> Objet : Re: Classloader issue using AvroParquetInputFormat via
>> HadoopInputFormat
>>
>>
>>
>> Hi Shannon!
>>
>>
>>
>> It seams that the something in the maven deployment went wrong with this
>> release.
>>
>>
>>
>> There should be:
>>
>>   - flink-java (the default, with a transitive dependency to hadoop 2.x for
>> hadoop compatibility features)
>>
>>   - flink-java-hadoop1 (with a transitive dependency for hadoop 1.x fir
>> older hadoop compatibility features)
>>
>>
>>
>> Apparently the "flink-java" artifact git overwritten with the
>> "flink-java-hadoop1" artifact. Damn.
>>
>>
>>
>> I think we need to release new artifacts that fix these dependency
>> descriptors.
>>
>>
>>
>> That needs to be a 1.1.1 release, because maven artifacts cannot be changed
>> after they were deployed.
>>
>>
>>
>> Greetings,
>> Stephan
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Aug 8, 2016 at 11:08 PM, Shannon Carey <[hidden email]> wrote:
>>
>> Correction: I cannot work around the problem. If I exclude hadoop1, I get
>> the following exception which appears to be due to flink-java-1.1.0's
>> dependency on Hadoop1.
>>
>>
>>
>> Failed to submit job 4b6366d101877d38ef33454acc6ca500
>> (com.expedia.www.flink.jobs.DestinationCountsHistoryJob$)
>>
>> org.apache.flink.runtime.client.JobExecutionException: Failed to submit job
>> 4b6366d101877d38ef33454acc6ca500
>> (com.expedia.www.flink.jobs.DestinationCountsHistoryJob$)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1281)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:478)
>>
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>>
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Caused by: org.apache.flink.runtime.JobException: Creating the input splits
>> caused an error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>> class was expected
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:695)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1178)
>>
>> ... 19 more
>>
>> Caused by: java.lang.IncompatibleClassChangeError: Found interface
>> org.apache.hadoop.mapreduce.JobContext, but class was expected
>>
>> at
>> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:158)
>>
>> at
>> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:56)
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>>
>> ... 21 more
>>
>>
>>
>> And if I exclude hadoop2, I get the exception from my previous email with
>> AvroParquetInputFormat.
>>
>>
>>
>>
>>
>>
>>
>> From: Shannon Carey <[hidden email]>
>> Date: Monday, August 8, 2016 at 2:46 PM
>> To: "[hidden email]" <[hidden email]>
>> Subject: Classloader issue using AvroParquetInputFormat via
>> HadoopInputFormat
>>
>>
>>
>> Hi folks, congrats on 1.1.0!
>>
>>
>>
>> FYI, after updating to Flink 1.1.0 I get the exception at bottom when
>> attempting to run a job that uses AvroParquetInputFormat wrapped in a Flink
>> HadoopInputFormat. The ContextUtil.java:71 is trying to execute:
>>
>>
>>
>> Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl");
>>
>>
>>
>> I am using Scala 2.11.7. JobContextImpl is coming from
>> flink-shaded-hadoop2:1.1.0. However, its parent class (JobContext) is
>> actually being loaded (according to output with JVM param "-verbose:class")
>> from the flink-shaded-hadoop1_2.10 jar.
>>
>>
>>
>> After adding an exclusion on flink-shaded-hadoop1_2.10, the problem appears
>> to be resolved. Is that the right way to fix the problem?
>>
>>
>>
>> From what I can tell, the problem is that the JARs that are deployed to
>> Maven Central were built with different versions of Hadoop (as controlled by
>> hadoop.profile):
>>
>>
>>
>> flink-runtime_2.11 depends on Hadoop 2
>>
>> flink-java depends on Hadoop 1 (Scala 2.10)
>>
>> flink-core depends on Hadoop 1 (Scala 2.10)
>>
>>
>>
>> This seems like a problem with Flink's build process.
>>
>>
>>
>> As an aside: would it be possible to change the interface of
>> HadoopInputFormat to take a Configuration instead of a Job? That would
>> reduce the dependence on the Hadoop API somewhat. It doesn't look like the
>> Job itself is ever actually used for anything. I'm glad to see you already
>> have https://issues.apache.org/jira/browse/FLINK-4316 and
>> https://issues.apache.org/jira/browse/FLINK-4315
>>
>>
>>
>> Thanks,
>>
>> Shannon
>>
>>
>>
>> java.lang.IncompatibleClassChangeError: Implementing class
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:264)
>>
>> at org.apache.parquet.hadoop.util.ContextUtil.<clinit>(ContextUtil.java:71)
>>
>> at
>> org.apache.parquet.avro.AvroParquetInputFormat.setRequestedProjection(AvroParquetInputFormat.java:54)
>>
>> at
>> com.expedia.www.sdk.flink.HistoricalDataIngestionJob.readHistoricalParquetFile(HistoricalDataIngestionJob.scala:63)
>>
>> at
>> com.expedia.www.flink.jobs.DestinationCountsHistoryJob$.main(DestinationCountsHistoryJob.scala:25)
>>
>> at
>> com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$anonfun$1.apply$mcV$sp(DestinationCountsHistoryTest.scala:23)
>>
>> at
>> com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$anonfun$1.apply(DestinationCountsHistoryTest.scala:20)
>>
>> at
>> com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$anonfun$1.apply(DestinationCountsHistoryTest.scala:20)
>>
>> at
>> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>>
>> at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>>
>> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>>
>> at org.scalatest.Transformer.apply(Transformer.scala:22)
>>
>> at org.scalatest.Transformer.apply(Transformer.scala:20)
>>
>> at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
>>
>> at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
>>
>> at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
>>
>> at
>> org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
>>
>> at
>> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
>>
>> at
>> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
>>
>> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>>
>> at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
>>
>> at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683)
>>
>> at
>> org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
>>
>> at
>> org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
>>
>> at
>> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
>>
>> at
>> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>>
>> at
>> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
>>
>> at
>> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
>>
>> at
>> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>>
>> at
>> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
>>
>> at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
>>
>> at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714)
>>
>> at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683)
>>
>> at org.scalatest.Suite$class.run(Suite.scala:1424)
>>
>> at
>> org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683)
>>
>> at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
>>
>> at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
>>
>> at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
>>
>> at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760)
>>
>> at
>> com.expedia.www.flink.jobs.DestinationCountsHistoryTest.org$scalatest$BeforeAndAfterAll$$super$run(DestinationCountsHistoryTest.scala:12)
>>
>> at
>> org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
>>
>> at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
>>
>> at
>> com.expedia.www.flink.jobs.DestinationCountsHistoryTest.run(DestinationCountsHistoryTest.scala:12)
>>
>> at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
>>
>> at
>> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
>>
>> at
>> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
>>
>> at
>> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
>>
>> at
>> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
>>
>> at
>> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
>>
>> at
>> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
>>
>> at org.scalatest.tools.Runner$.run(Runner.scala:883)
>>
>> at org.scalatest.tools.Runner.run(Runner.scala)
>>
>> at
>> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
>>
>> at
>> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>>
>>
>>
>>
>>
>>
>>
>> ________________________________
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
>> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
>> l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.