Using Kryo for serializing lambda closures

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Kryo for serializing lambda closures

Nick Dimiduk
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more
Reply | Threaded
Open this post in threaded view
|

Re: Using Kryo for serializing lambda closures

Till Rohrmann

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more

Reply | Threaded
Open this post in threaded view
|

Re: Using Kryo for serializing lambda closures

Nick Dimiduk-2
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming.


-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more


Reply | Threaded
Open this post in threaded view
|

Re: Using Kryo for serializing lambda closures

Fabian Hueske-2
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk <[hidden email]>:
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming.


-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more



Reply | Threaded
Open this post in threaded view
|

Re: Using Kryo for serializing lambda closures

Nick Dimiduk
Ah, very good. I've closed my issue as a duplicate. Thanks for the reference.

On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk <[hidden email]>:
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming.


-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more




Reply | Threaded
Open this post in threaded view
|

Re: Using Kryo for serializing lambda closures

Stephan Ewen
Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous classes.
I have been using Java 8 lambdas quite a bit with Flink.

The important thing is that no non-serializable objects are in the closure.

As Fabian mentioned, lazy initialization helps. Serializability is also discussed here: http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable

Adding another serialization framework may help for cases where simply the java.io.Serializable interface is missing in an object. However, Not everything is magically serializable with Kryo.
There are classes that you can serialize with Java Serialization, but not out of the box with Kryo (especially when immutable collections are involved). Also classes that have no default constructors, but have checks on invariants, etc can fail with Kryo arbitrarily.



On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <[hidden email]> wrote:
Ah, very good. I've closed my issue as a duplicate. Thanks for the reference.

On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk <[hidden email]>:
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming.


-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more





Reply | Threaded
Open this post in threaded view
|

Re: Using Kryo for serializing lambda closures

Nick Dimiduk
The point is to provide a means for user to work around nonconforming APIs. Kryo at least is extensible in that you can register additional serializers.

On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen <[hidden email]> wrote:
Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous classes.
I have been using Java 8 lambdas quite a bit with Flink.

The important thing is that no non-serializable objects are in the closure.

As Fabian mentioned, lazy initialization helps. Serializability is also discussed here: http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable

Adding another serialization framework may help for cases where simply the java.io.Serializable interface is missing in an object. However, Not everything is magically serializable with Kryo.
There are classes that you can serialize with Java Serialization, but not out of the box with Kryo (especially when immutable collections are involved). Also classes that have no default constructors, but have checks on invariants, etc can fail with Kryo arbitrarily.



On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <[hidden email]> wrote:
Ah, very good. I've closed my issue as a duplicate. Thanks for the reference.

On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk <[hidden email]>:
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming.


-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more






Reply | Threaded
Open this post in threaded view
|

Re: Using Kryo for serializing lambda closures

Stephan Ewen
That is true.

If you want to look into this, there are probably two places that need adjustment:

1) The UserCodeObjectWrapper would need to be adjusted to hold a serialized object (a byte[]) for shipping and serialize that object differently (say trying Java, then falling back to Kryo). 

2) The ClosureCleaner that also check serializability should then no longer check eagerly for serializability.

After that, we could think about exposing a way to register custom serializers with Kryo for the UserCodeObjectWrapper.

Greetings,
Stephan


On Tue, Dec 8, 2015 at 8:44 PM, Nick Dimiduk <[hidden email]> wrote:
The point is to provide a means for user to work around nonconforming APIs. Kryo at least is extensible in that you can register additional serializers.

On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen <[hidden email]> wrote:
Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous classes.
I have been using Java 8 lambdas quite a bit with Flink.

The important thing is that no non-serializable objects are in the closure.

As Fabian mentioned, lazy initialization helps. Serializability is also discussed here: http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable

Adding another serialization framework may help for cases where simply the java.io.Serializable interface is missing in an object. However, Not everything is magically serializable with Kryo.
There are classes that you can serialize with Java Serialization, but not out of the box with Kryo (especially when immutable collections are involved). Also classes that have no default constructors, but have checks on invariants, etc can fail with Kryo arbitrarily.



On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <[hidden email]> wrote:
Ah, very good. I've closed my issue as a duplicate. Thanks for the reference.

On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk <[hidden email]>:
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming.


-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more