Re: Objects deserialization on Jobmanager

Posted by Ventura Del Monte on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Objects-deserialization-on-Jobmanager-tp1104p1108.html

I am using Flink 0.9-SNAPSHOT, this is the complete stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.autoencoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.client.program.Client.run(Client.java:378)2015-04-22 17:14:18 INFO  DL4Flink:158 - Elapsed: 3

at org.apache.flink.client.program.Client.run(Client.java:314)
at org.apache.flink.client.program.Client.run(Client.java:307)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
at org.dl4flink.DL4Flink.RunFlinkJob(DL4Flink.java:295)
at org.dl4flink.DL4Flink.main(DL4Flink.java:56)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:527)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:511)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:197)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:44)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:524)
... 25 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 26 more


I checked the jobmanager log and I know that the objected needed for the deserialization is null.


public void kryoDeserialize(Kryo kryo, Input in) 
{
    this.rows = in.readInt();
  this.cols = in.readInt();
    this.size = this.rows * this.cols;
    double[] tmp = in.readDoubles(this.size);
    Core.CudaExecutor.invoke((handle) -> cuMemAlloc(this.deviceData, this.size * Sizeof.DOUBLE)); // here CudaExecutor is null on JobManager
}

As you can see, deviceData is my transient field I need to store/read in a specific way, since it is a pointer to gpu memory.

The object I need to deserialize is part of a broadcast set. I think this is the reason why the jobmanager needs to read it (i figured it out after I sent my first mail).
I am thinking over whether I should edit my code in order to get rid of this situation, since having the jobmanager allocating could be a drawback. What do you think about that?

Thank you for your time!


2015-04-22 16:48 GMT+02:00 Till Rohrmann <[hidden email]>:
The corresponding code snippet could also help.

Cheers,

Till

On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <[hidden email]> wrote:
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura