java.io.NotSerializableException when executing my program on 0.7 snapshot

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

java.io.NotSerializableException when executing my program on 0.7 snapshot

Stefan Bunk
Hi Flinkers,

so, while upgrading to the latest 0.7 snapshot (Scala API) for some features I need, I ran into the following error (main part: java.io.NotSerializableException: my.example.FlinkProgram):

Exception in thread "main" org.apache.flink.compiler.CompilerException: Error translating node 'Map "org.apache.flink.api.scala.DataSet$$anon$1" : MAP [[ GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: my.example.FlinkProgram
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:338)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:95)
        [many more lines of stacktrace ..]

Questions:
 - Why does Flink need to serialize that class? Basically, the class only consists of some methods, that together build my execution graph and several data sinks. It does not even have state yet.
 - How can I make it serializable? I tried extending Serializable, but that did not work out.
 - What happens, once I add state, that I do not want to serialize?

Just a quick side question, did you remove the count() method for grouped data sets in the 0.7 release on purpose? If yes, is there a short cut for counting a grouping, without writing the reduceGroup function by hand?

Thanks in advance
Stefan
Reply | Threaded
Open this post in threaded view
|

Re: java.io.NotSerializableException when executing my program on 0.7 snapshot

Stephan Ewen
Hi Stefan!

Flink does not really need to serialize that class, it only needs to serialize the UDFs / lambdas / functions.

Problem is: depending on how the lambda / UDF is written, Scala/Java puts the enclosing classes into the closure and tries to serialize them as well. We are trying to add code that automatically removes these unnecessary parts.

Until then, the solution would be to make sure that the surrounding class does not become part of the closure.


Greetings,
Stephan



On Fri, Oct 17, 2014 at 4:59 PM, Stefan Bunk <[hidden email]> wrote:
Hi Flinkers,

so, while upgrading to the latest 0.7 snapshot (Scala API) for some features I need, I ran into the following error (main part: java.io.NotSerializableException: my.example.FlinkProgram):

Exception in thread "main" org.apache.flink.compiler.CompilerException: Error translating node 'Map "org.apache.flink.api.scala.DataSet$$anon$1" : MAP [[ GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: my.example.FlinkProgram
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:338)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:95)
        [many more lines of stacktrace ..]

Questions:
 - Why does Flink need to serialize that class? Basically, the class only consists of some methods, that together build my execution graph and several data sinks. It does not even have state yet.
 - How can I make it serializable? I tried extending Serializable, but that did not work out.
 - What happens, once I add state, that I do not want to serialize?

Just a quick side question, did you remove the count() method for grouped data sets in the 0.7 release on purpose? If yes, is there a short cut for counting a grouping, without writing the reduceGroup function by hand?

Thanks in advance
Stefan

Reply | Threaded
Open this post in threaded view
|

Re: java.io.NotSerializableException when executing my program on 0.7 snapshot

Stefan Bunk
Hi,

I got it working now:
 - It seems so, that even the simplest local boolean variable (not an instance variable) in the closure of a method leads Java to putting the outer class in the closure
 - I defined some case classes inside my methods, they caused the same issue. I moved them to another class.

Greetings
Stefan 

On Fri, Oct 17, 2014 at 5:42 PM, Stephan Ewen <[hidden email]> wrote:
Hi Stefan!

Flink does not really need to serialize that class, it only needs to serialize the UDFs / lambdas / functions.

Problem is: depending on how the lambda / UDF is written, Scala/Java puts the enclosing classes into the closure and tries to serialize them as well. We are trying to add code that automatically removes these unnecessary parts.

Until then, the solution would be to make sure that the surrounding class does not become part of the closure.


Greetings,
Stephan



On Fri, Oct 17, 2014 at 4:59 PM, Stefan Bunk <[hidden email]> wrote:
Hi Flinkers,

so, while upgrading to the latest 0.7 snapshot (Scala API) for some features I need, I ran into the following error (main part: java.io.NotSerializableException: my.example.FlinkProgram):

Exception in thread "main" org.apache.flink.compiler.CompilerException: Error translating node 'Map "org.apache.flink.api.scala.DataSet$$anon$1" : MAP [[ GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: my.example.FlinkProgram
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:338)
at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:95)
        [many more lines of stacktrace ..]

Questions:
 - Why does Flink need to serialize that class? Basically, the class only consists of some methods, that together build my execution graph and several data sinks. It does not even have state yet.
 - How can I make it serializable? I tried extending Serializable, but that did not work out.
 - What happens, once I add state, that I do not want to serialize?

Just a quick side question, did you remove the count() method for grouped data sets in the 0.7 release on purpose? If yes, is there a short cut for counting a grouping, without writing the reduceGroup function by hand?

Thanks in advance
Stefan