This post was updated on .
Hello,
We have recently upgraded flink to version 1.4.2. Now our jobs that rely on Parquet/Avro files located on HDFS stopped working. I get exception: Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'CHAIN DataSource (READING_RECORDS) -> Map (MAPPING_RECORDS)': Deserializing the InputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@50bb10fd) failed: unread block data at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:168) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Deserializing the InputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@50bb10fd) failed: unread block data at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:164) ... 24 common frames omitted Caused by: java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2740) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:437) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:424) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:412) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:373) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63) ... 25 common frames omitted Im some other topic i have read about problem with primitives, but I dont know if that is something similar to serializing of HadoopInputFormat. Do you have any info what could be wrong? We have new flink-hadoop-compatibility version also upgraded in flink classpath. we were jumping from version 1.3.1 (where everything worked fine) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
One thing that changed in Flink 1.4 with respect to Hadoop is that Hadoop is now an optional dependency. Since Hadoop dependencies are now dynamically loaded, you might use different versions on the client and the cluster? Also the order in which classes are loaded changed. You could try to enable the previous classloading behavior by setting the parameter classloader.resolve-order: parent-first
See the release notes [1] for details on Hadoop-free Flink and classloading changes. 2018-03-15 15:35 GMT+01:00 eSKa <[hidden email]>: we were jumping from version 1.3.1 (where everything worked fine) |
Thanks a lot. It seems to work.
What is now the default classloader's order? To keep it working in new version how should I inject Hadoop dependencies so that they are read properly? The class that is missing (HadoopInputFormat) is from hadoop-compatibility library. I have upgraded it to version 1.4.2 as everything else. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |