Crash on DataSet.collect()

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Crash on DataSet.collect()

Flavio Baronti
Hello,

I'm testing the new DataSet.collect() method on version 0.9-milestone-1, but
I obtain the following error on cluster execution (no problem with local
execution), which also causes the job manager to crash:

14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
main(Test01.java:54)) -> F
latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying DataSink
(org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16) (1/1) (attemp
t #0) to india3
14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
04 14:05:39 CEST 201
5) changed to FINISHED .
14:05:41,284 ERROR akka.actor.OneForOneStrategy
- java.io.StreamCorruptedException: invalid type code: 00
org.apache.commons.lang3.SerializationException:
java.io.StreamCorruptedException: invalid type code: 00
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:232)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:268)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:51)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:35)
        at
org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
cumulatorResults(AccumulatorManager.java:77)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
es$1.applyOrElse(JobManager.scala:300)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
unction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:25)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:37)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:30)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
es.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
a:91)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
.java:1253)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
346)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:224)
        ... 24 more
14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Stopping JobManager akka://flink/user/jobmanager#-828467473.
14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
- Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
process...

Is this a known issue? Am I doing something wrong?

Thanks
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Crash on DataSet.collect()

Stephan Ewen
Hi Flavio!

This issue is known and has been fixed already. It occurs when you use custom types in collect, because it uses the wrong classloader/serializer to transfer them.

The current master should not have this issue any more.

Greetings,
Stephan


On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <[hidden email]> wrote:
Hello,

I'm testing the new DataSet.collect() method on version 0.9-milestone-1, but
I obtain the following error on cluster execution (no problem with local
execution), which also causes the job manager to crash:

14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
main(Test01.java:54)) -> F
latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying DataSink
(org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16) (1/1) (attemp
t #0) to india3
14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
04 14:05:39 CEST 201
5) changed to FINISHED .
14:05:41,284 ERROR akka.actor.OneForOneStrategy
- java.io.StreamCorruptedException: invalid type code: 00
org.apache.commons.lang3.SerializationException:
java.io.StreamCorruptedException: invalid type code: 00
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:232)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:268)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:51)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:35)
        at
org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
cumulatorResults(AccumulatorManager.java:77)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
es$1.applyOrElse(JobManager.scala:300)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
unction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:25)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:37)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:30)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
es.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
a:91)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
.java:1253)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
346)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:224)
        ... 24 more
14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Stopping JobManager akka://flink/user/jobmanager#-828467473.
14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
- Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
process...

Is this a known issue? Am I doing something wrong?

Thanks
Flavio



Reply | Threaded
Open this post in threaded view
|

RE: Crash on DataSet.collect()

Flavio Baronti

Hi Stephan,

 

I confirm that I was using custom types in the collect(), and that the bug is not present in the master.

 

Thanks

Flavio

 

 

From: [hidden email] [mailto:[hidden email]] On Behalf Of Stephan Ewen
Sent: Monday, May 04, 2015 2:33 PM
To: [hidden email]
Subject: Re: Crash on DataSet.collect()

 

Hi Flavio!

 

This issue is known and has been fixed already. It occurs when you use custom types in collect, because it uses the wrong classloader/serializer to transfer them.

 

The current master should not have this issue any more.

 

Greetings,
Stephan

 

 

On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <[hidden email]> wrote:

Hello,

I'm testing the new DataSet.collect() method on version 0.9-milestone-1, but
I obtain the following error on cluster execution (no problem with local
execution), which also causes the job manager to crash:

14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
main(Test01.java:54)) -> F
latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying DataSink
([hidden email]) (1/1) (attemp
t #0) to india3
14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
04 14:05:39 CEST 201
5) changed to FINISHED .
14:05:41,284 ERROR akka.actor.OneForOneStrategy
- java.io.StreamCorruptedException: invalid type code: 00
org.apache.commons.lang3.SerializationException:
java.io.StreamCorruptedException: invalid type code: 00
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:232)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:268)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:51)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:35)
        at
org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
cumulatorResults(AccumulatorManager.java:77)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
es$1.applyOrElse(JobManager.scala:300)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
unction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:25)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:37)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:30)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
es.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
a:91)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
.java:1253)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
346)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:224)
        ... 24 more
14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Stopping JobManager akka://flink/user/jobmanager#-828467473.
14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
- Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
process...

Is this a known issue? Am I doing something wrong?

Thanks
Flavio