Hello,
I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something? Thanks, Nick org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) at org.apache.flink.client.program.Client.runBlocking(Client.java:252) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027) Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149) at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550) at ImportFlow.assembleImportFlow(ImportFlow.java:111) at ImportFlow.main(ImportFlow.java:178) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) ... 6 more Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.ArrayList.writeObject(ArrayList.java:762) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) ... 17 more |
Hi Nick, at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Cheers, On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
|
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming. I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue. -n On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <[hidden email]> wrote:
|
Hi Nick, thanks for pushing this and opening the JIRA issue.So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet. Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions. Best, Fabian 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <[hidden email]>:
|
Ah, very good. I've closed my issue as a duplicate. Thanks for the reference. On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <[hidden email]> wrote:
|
Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous classes. I have been using Java 8 lambdas quite a bit with Flink. The important thing is that no non-serializable objects are in the closure. As Fabian mentioned, lazy initialization helps. Serializability is also discussed here: http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable Adding another serialization framework may help for cases where simply the java.io.Serializable interface is missing in an object. However, Not everything is magically serializable with Kryo. There are classes that you can serialize with Java Serialization, but not out of the box with Kryo (especially when immutable collections are involved). Also classes that have no default constructors, but have checks on invariants, etc can fail with Kryo arbitrarily. On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <[hidden email]> wrote:
|
The point is to provide a means for user to work around nonconforming APIs. Kryo at least is extensible in that you can register additional serializers. On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen <[hidden email]> wrote:
|
That is true. If you want to look into this, there are probably two places that need adjustment: 1) The UserCodeObjectWrapper would need to be adjusted to hold a serialized object (a byte[]) for shipping and serialize that object differently (say trying Java, then falling back to Kryo). 2) The ClosureCleaner that also check serializability should then no longer check eagerly for serializability. After that, we could think about exposing a way to register custom serializers with Kryo for the UserCodeObjectWrapper. Greetings, Stephan On Tue, Dec 8, 2015 at 8:44 PM, Nick Dimiduk <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |