Objects deserialization on Jobmanager

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Objects deserialization on Jobmanager

Ventura Del Monte
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura
Reply | Threaded
Open this post in threaded view
|

Re: Objects deserialization on Jobmanager

rmetzger0
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura

Reply | Threaded
Open this post in threaded view
|

Re: Objects deserialization on Jobmanager

Till Rohrmann
The corresponding code snippet could also help.

Cheers,

Till

On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <[hidden email]> wrote:
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura


Reply | Threaded
Open this post in threaded view
|

Re: Objects deserialization on Jobmanager

Stephan Ewen
Hi Ventura!

Flink loads on startup only the Flink classes and all user code classes are loaded dynamically. Each point when a user class is used, or a user class object is deserialized, Flink uses the user code classloader of that specific job.

For you as the user, this has the implication that if try to load a class via "Class.forName", you need to supply the user code classloader as well.

You can grab the usercode class loader in various ways:
1) Inside a UDF, you can grab it by making this a RichFunction and using "getRuntimeContext().getUsercodeClassloader()"
2) At almost any other play, you can use "Thread.currentThread().getContextClassloader()"

Hope that helps.

Greetings,
Stephan



On Wed, Apr 22, 2015 at 4:48 PM, Till Rohrmann <[hidden email]> wrote:
The corresponding code snippet could also help.

Cheers,

Till

On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <[hidden email]> wrote:
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura



Reply | Threaded
Open this post in threaded view
|

Re: Objects deserialization on Jobmanager

Ventura Del Monte
In reply to this post by Till Rohrmann
I am using Flink 0.9-SNAPSHOT, this is the complete stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.autoencoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.client.program.Client.run(Client.java:378)2015-04-22 17:14:18 INFO  DL4Flink:158 - Elapsed: 3

at org.apache.flink.client.program.Client.run(Client.java:314)
at org.apache.flink.client.program.Client.run(Client.java:307)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
at org.dl4flink.DL4Flink.RunFlinkJob(DL4Flink.java:295)
at org.dl4flink.DL4Flink.main(DL4Flink.java:56)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:527)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:511)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:197)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:44)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:524)
... 25 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 26 more


I checked the jobmanager log and I know that the objected needed for the deserialization is null.


public void kryoDeserialize(Kryo kryo, Input in) 
{
    this.rows = in.readInt();
  this.cols = in.readInt();
    this.size = this.rows * this.cols;
    double[] tmp = in.readDoubles(this.size);
    Core.CudaExecutor.invoke((handle) -> cuMemAlloc(this.deviceData, this.size * Sizeof.DOUBLE)); // here CudaExecutor is null on JobManager
}

As you can see, deviceData is my transient field I need to store/read in a specific way, since it is a pointer to gpu memory.

The object I need to deserialize is part of a broadcast set. I think this is the reason why the jobmanager needs to read it (i figured it out after I sent my first mail).
I am thinking over whether I should edit my code in order to get rid of this situation, since having the jobmanager allocating could be a drawback. What do you think about that?

Thank you for your time!


2015-04-22 16:48 GMT+02:00 Till Rohrmann <[hidden email]>:
The corresponding code snippet could also help.

Cheers,

Till

On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <[hidden email]> wrote:
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura



Reply | Threaded
Open this post in threaded view
|

Re: Objects deserialization on Jobmanager

Stephan Ewen
Hi Ventura!

You are distributing your data via something like "env.fromElements(...)" or "env.fromCollection(...)", is that correct?

The master node (JobManager) currently takes each InputFormat and checks whether it needs some "master side initialization". For file input formats, this computes for example the different splits of the file(s) that the parallel tasks will read.
For inputs like "env.fromElements(...)" or "env.fromCollection(...)", this is redundant, since there is no need to coordinate anything, it is just that this initialization check happens for all inputs. It is a good idea to skip that for collection inputs.

If you want to avoid that this happens on the JobManager, the simplest way would be to make the data source independent of the Cuda types.

  - Define the source as tuple2 with the row and column dimensions.

    DataSet<Tuple2<Integer, Integer>> source = env.fromElements(new Tuple2<>(...), new Tuple2<>(...));

  - Transform the tuples into your Cuda types. Also, since that source is not parallel (java/scala collections are always run with
    parallelism 1), make sure you tell the system to go parallel after that:

    DataSet<GpuDataRegion> data = source.map( (tuple) -> { /* your code for inirialization } ).parallelism(64); 
   // the last statement makes sure the mapper runs with 64 parallel instances


Out of curiosity: The deserialization bug occurs here on the JobManager (because the JobManager looks into the Inputs), but I assume it would also occur on the TaskManagers (workers) once the proper execution starts?
How is Core.CudaExecutor usually initialized, so that it is not null when you need it?

Greetings,
Stephan


On Wed, Apr 22, 2015 at 5:50 PM, Ventura Del Monte <[hidden email]> wrote:
I am using Flink 0.9-SNAPSHOT, this is the complete stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.autoencoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.client.program.Client.run(Client.java:378)2015-04-22 17:14:18 INFO  DL4Flink:158 - Elapsed: 3

at org.apache.flink.client.program.Client.run(Client.java:314)
at org.apache.flink.client.program.Client.run(Client.java:307)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
at org.dl4flink.DL4Flink.RunFlinkJob(DL4Flink.java:295)
at org.dl4flink.DL4Flink.main(DL4Flink.java:56)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:527)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:511)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:197)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:44)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:524)
... 25 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 26 more


I checked the jobmanager log and I know that the objected needed for the deserialization is null.


public void kryoDeserialize(Kryo kryo, Input in) 
{
    this.rows = in.readInt();
  this.cols = in.readInt();
    this.size = this.rows * this.cols;
    double[] tmp = in.readDoubles(this.size);
    Core.CudaExecutor.invoke((handle) -> cuMemAlloc(this.deviceData, this.size * Sizeof.DOUBLE)); // here CudaExecutor is null on JobManager
}

As you can see, deviceData is my transient field I need to store/read in a specific way, since it is a pointer to gpu memory.

The object I need to deserialize is part of a broadcast set. I think this is the reason why the jobmanager needs to read it (i figured it out after I sent my first mail).
I am thinking over whether I should edit my code in order to get rid of this situation, since having the jobmanager allocating could be a drawback. What do you think about that?

Thank you for your time!


2015-04-22 16:48 GMT+02:00 Till Rohrmann <[hidden email]>:
The corresponding code snippet could also help.

Cheers,

Till

On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <[hidden email]> wrote:
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura




Reply | Threaded
Open this post in threaded view
|

Re: Objects deserialization on Jobmanager

Ventura Del Monte
Hi Stephan!

Thank you for your reply, first of all! You're right about how I distributed my data. I need this because I have an object that should be shared among tasks. I am working on decoupling this object from the cuda type at the moment and I will follow your suggestions!

About my CudaExecutor, it's a worker thread binded to a gpu cuda context and it acts following the multi producer - single consumer pattern, initialized in RichMapFunction.open method and on client startup, it was not supposed to run on jobmaner, i did not expect that would happen, to be honest. But I think I need to redesign my software architecture, because the cuda worker could be like a  bottleneck with higher level of parallelism. Moreover the whole system will handle many context creations/distructions in open/close methods. I was thinking of editing flink-runtime, in order to make it aware of gpu resources: when taskmanager spawns a new thread, this should initialize a cuda context binded to one of the gpu of the underlying hardware. I think this can be easily done in RuntimeEnvironment and in instance.* classes (plus adding more configuration options). That would allow me to execute my dl library on heterogeneous multi-gpu clusters. I think it should work and I would like to know your opinion about that if you do not mind. Yet I have a doubt, will flink use the same thread to process tasks which are in the same slot? Thanks in advance.

Regards, 
Ventura


2015-04-24 10:26 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi Ventura!

You are distributing your data via something like "env.fromElements(...)" or "env.fromCollection(...)", is that correct?

The master node (JobManager) currently takes each InputFormat and checks whether it needs some "master side initialization". For file input formats, this computes for example the different splits of the file(s) that the parallel tasks will read.
For inputs like "env.fromElements(...)" or "env.fromCollection(...)", this is redundant, since there is no need to coordinate anything, it is just that this initialization check happens for all inputs. It is a good idea to skip that for collection inputs.

If you want to avoid that this happens on the JobManager, the simplest way would be to make the data source independent of the Cuda types.

  - Define the source as tuple2 with the row and column dimensions.

    DataSet<Tuple2<Integer, Integer>> source = env.fromElements(new Tuple2<>(...), new Tuple2<>(...));

  - Transform the tuples into your Cuda types. Also, since that source is not parallel (java/scala collections are always run with
    parallelism 1), make sure you tell the system to go parallel after that:

    DataSet<GpuDataRegion> data = source.map( (tuple) -> { /* your code for inirialization } ).parallelism(64); 
   // the last statement makes sure the mapper runs with 64 parallel instances


Out of curiosity: The deserialization bug occurs here on the JobManager (because the JobManager looks into the Inputs), but I assume it would also occur on the TaskManagers (workers) once the proper execution starts?
How is Core.CudaExecutor usually initialized, so that it is not null when you need it?

Greetings,
Stephan


On Wed, Apr 22, 2015 at 5:50 PM, Ventura Del Monte <[hidden email]> wrote:
I am using Flink 0.9-SNAPSHOT, this is the complete stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.autoencoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.client.program.Client.run(Client.java:378)2015-04-22 17:14:18 INFO  DL4Flink:158 - Elapsed: 3

at org.apache.flink.client.program.Client.run(Client.java:314)
at org.apache.flink.client.program.Client.run(Client.java:307)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
at org.dl4flink.DL4Flink.RunFlinkJob(DL4Flink.java:295)
at org.dl4flink.DL4Flink.main(DL4Flink.java:56)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:527)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:511)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:197)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:44)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:524)
... 25 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 26 more


I checked the jobmanager log and I know that the objected needed for the deserialization is null.


public void kryoDeserialize(Kryo kryo, Input in) 
{
    this.rows = in.readInt();
  this.cols = in.readInt();
    this.size = this.rows * this.cols;
    double[] tmp = in.readDoubles(this.size);
    Core.CudaExecutor.invoke((handle) -> cuMemAlloc(this.deviceData, this.size * Sizeof.DOUBLE)); // here CudaExecutor is null on JobManager
}

As you can see, deviceData is my transient field I need to store/read in a specific way, since it is a pointer to gpu memory.

The object I need to deserialize is part of a broadcast set. I think this is the reason why the jobmanager needs to read it (i figured it out after I sent my first mail).
I am thinking over whether I should edit my code in order to get rid of this situation, since having the jobmanager allocating could be a drawback. What do you think about that?

Thank you for your time!


2015-04-22 16:48 GMT+02:00 Till Rohrmann <[hidden email]>:
The corresponding code snippet could also help.

Cheers,

Till

On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <[hidden email]> wrote:
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura





Reply | Threaded
Open this post in threaded view
|

Re: Objects deserialization on Jobmanager

Stephan Ewen
Hi Ventura!

I hope you can get along without editing the Flink runtime, with the help of overriding the open/close methods of the RichFunctions and do your initialization there.
They are called once per life of a parallel function (except in iterations, where they are called once per superstep, but you can always check whether you are in the first superstep)

For the threads: each parallel instance of a function has one thread, unless they are chained (you see this in the log and WebUI for example by the operator name "Chain Source -> Map -> Combiner". If parallel functions share a slot, they still get a dedicated thread each, they only divide the Flink-managed memory among themselves. Only the sorter as additional threads for asynchronous sorting / spilling, but those execute no user code.

Would it work if your data types (that are exchanged between functions) do not carry and CUDE resource related data types, but only the arrays? Then, inside your functions, you create the cuda types, and emit again only the data arrays?

Greetings,
Stephan


On Fri, Apr 24, 2015 at 3:54 PM, Ventura Del Monte <[hidden email]> wrote:
Hi Stephan!

Thank you for your reply, first of all! You're right about how I distributed my data. I need this because I have an object that should be shared among tasks. I am working on decoupling this object from the cuda type at the moment and I will follow your suggestions!

About my CudaExecutor, it's a worker thread binded to a gpu cuda context and it acts following the multi producer - single consumer pattern, initialized in RichMapFunction.open method and on client startup, it was not supposed to run on jobmaner, i did not expect that would happen, to be honest. But I think I need to redesign my software architecture, because the cuda worker could be like a  bottleneck with higher level of parallelism. Moreover the whole system will handle many context creations/distructions in open/close methods. I was thinking of editing flink-runtime, in order to make it aware of gpu resources: when taskmanager spawns a new thread, this should initialize a cuda context binded to one of the gpu of the underlying hardware. I think this can be easily done in RuntimeEnvironment and in instance.* classes (plus adding more configuration options). That would allow me to execute my dl library on heterogeneous multi-gpu clusters. I think it should work and I would like to know your opinion about that if you do not mind. Yet I have a doubt, will flink use the same thread to process tasks which are in the same slot? Thanks in advance.

Regards, 
Ventura


2015-04-24 10:26 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi Ventura!

You are distributing your data via something like "env.fromElements(...)" or "env.fromCollection(...)", is that correct?

The master node (JobManager) currently takes each InputFormat and checks whether it needs some "master side initialization". For file input formats, this computes for example the different splits of the file(s) that the parallel tasks will read.
For inputs like "env.fromElements(...)" or "env.fromCollection(...)", this is redundant, since there is no need to coordinate anything, it is just that this initialization check happens for all inputs. It is a good idea to skip that for collection inputs.

If you want to avoid that this happens on the JobManager, the simplest way would be to make the data source independent of the Cuda types.

  - Define the source as tuple2 with the row and column dimensions.

    DataSet<Tuple2<Integer, Integer>> source = env.fromElements(new Tuple2<>(...), new Tuple2<>(...));

  - Transform the tuples into your Cuda types. Also, since that source is not parallel (java/scala collections are always run with
    parallelism 1), make sure you tell the system to go parallel after that:

    DataSet<GpuDataRegion> data = source.map( (tuple) -> { /* your code for inirialization } ).parallelism(64); 
   // the last statement makes sure the mapper runs with 64 parallel instances


Out of curiosity: The deserialization bug occurs here on the JobManager (because the JobManager looks into the Inputs), but I assume it would also occur on the TaskManagers (workers) once the proper execution starts?
How is Core.CudaExecutor usually initialized, so that it is not null when you need it?

Greetings,
Stephan


On Wed, Apr 22, 2015 at 5:50 PM, Ventura Del Monte <[hidden email]> wrote:
I am using Flink 0.9-SNAPSHOT, this is the complete stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.autoencoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.client.program.Client.run(Client.java:378)2015-04-22 17:14:18 INFO  DL4Flink:158 - Elapsed: 3

at org.apache.flink.client.program.Client.run(Client.java:314)
at org.apache.flink.client.program.Client.run(Client.java:307)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
at org.dl4flink.DL4Flink.RunFlinkJob(DL4Flink.java:295)
at org.dl4flink.DL4Flink.main(DL4Flink.java:56)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at <init>(DownpourSDG.java:28) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:527)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:511)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:197)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:44)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) failed: unread block data
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:524)
... 25 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 26 more


I checked the jobmanager log and I know that the objected needed for the deserialization is null.


public void kryoDeserialize(Kryo kryo, Input in) 
{
    this.rows = in.readInt();
  this.cols = in.readInt();
    this.size = this.rows * this.cols;
    double[] tmp = in.readDoubles(this.size);
    Core.CudaExecutor.invoke((handle) -> cuMemAlloc(this.deviceData, this.size * Sizeof.DOUBLE)); // here CudaExecutor is null on JobManager
}

As you can see, deviceData is my transient field I need to store/read in a specific way, since it is a pointer to gpu memory.

The object I need to deserialize is part of a broadcast set. I think this is the reason why the jobmanager needs to read it (i figured it out after I sent my first mail).
I am thinking over whether I should edit my code in order to get rid of this situation, since having the jobmanager allocating could be a drawback. What do you think about that?

Thank you for your time!


2015-04-22 16:48 GMT+02:00 Till Rohrmann <[hidden email]>:
The corresponding code snippet could also help.

Cheers,

Till

On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <[hidden email]> wrote:
Hi,

which version of Flink are you using?

Can you send us the complete stack trace of the error to help us understand the exact location where the issue occurs?

On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <[hidden email]> wrote:
Hello, I am working on a flink-based deep learning library for my master's thesis. I am experiencing this issue at the moment: I have a java class with a transient field, so I had to write both a kryo custom serializer and a java one. The (de)serialization needs to access another object of my system, so if I run my software locally it works fine because the needed object is instantiated meanwhile it crashes when I run it in a remote environment because when the jobmanager receives the data, the object needed for the deserialization is not present in the system. Thus, my question is whether it is possible to let the jobmanager execute some user code or would it be better to edit the architecture of my system in order to avoid this kind of problem?

Regards,
Ventura