Hello,
I'm testing the new DataSet.collect() method on version 0.9-milestone-1, but I obtain the following error on cluster execution (no problem with local execution), which also causes the job manager to crash: 14:05:41,145 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at main(Test01.java:54)) -> F latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3 14:05:41,211 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16) (1/1) (attemp t #0) to india3 14:05:41,269 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May 04 14:05:39 CEST 201 5) changed to FINISHED . 14:05:41,284 ERROR akka.actor.OneForOneStrategy - java.io.StreamCorruptedException: invalid type code: 00 org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid type code: 00 at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j ava:232) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j ava:268) at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA ccumulator.java:51) at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA ccumulator.java:35) at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc cumulatorResults(AccumulatorManager.java:77) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag es$1.applyOrElse(JobManager.scala:300) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF unction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction. scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction. scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca la:37) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca la:30) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag es.scala:30) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal a:91) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool .java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1 346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java :107) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j ava:224) ... 24 more 14:05:41,290 INFO org.apache.flink.runtime.jobmanager.JobManager - Stopping JobManager akka://flink/user/jobmanager#-828467473. 14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager - Actor akka://flink/user/jobmanager#-828467473 terminated, stopping process... Is this a known issue? Am I doing something wrong? Thanks Flavio |
Hi Flavio! This issue is known and has been fixed already. It occurs when you use custom types in collect, because it uses the wrong classloader/serializer to transfer them. The current master should not have this issue any more. Greetings, Stephan On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <[hidden email]> wrote: Hello, |
Hi Stephan, I confirm that I was using custom types in the collect(), and that the bug is not present in the master. Thanks Flavio From: [hidden email] [mailto:[hidden email]] On Behalf Of Stephan Ewen Hi Flavio! This issue is known and has been fixed already. It occurs when you use custom types in collect, because it uses the wrong classloader/serializer to transfer them. The current master should not have this issue any more. Greetings, On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |