Working with protobuf wrappers

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Working with protobuf wrappers

Krzysztof Zarzycki
Hi! 
I'm trying to use generated Protobuf wrappers compiled with protoc and pass them as objects between functions of Flink. I'm using Flink 0.10.0. 
Unfortunately, I get an exception on runtime: 

[...]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink configuration after fixing this issue in 0.9/0.8.1: https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration? 
I'll be grateful for your help with this issue.
Cheers,
Krzysztof

Reply | Threaded
Open this post in threaded view
|

Re: Working with protobuf wrappers

Till Rohrmann
Hi Kryzsztof,

it's true that we once added the Protobuf serializer automatically. However, due to versioning conflicts (see https://issues.apache.org/jira/browse/FLINK-1635), we removed it again. Now you have to register the ProtobufSerializer manually: https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program.

Cheers,
Till

On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <[hidden email]> wrote:
Hi! 
I'm trying to use generated Protobuf wrappers compiled with protoc and pass them as objects between functions of Flink. I'm using Flink 0.10.0. 
Unfortunately, I get an exception on runtime: 

[...]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink configuration after fixing this issue in 0.9/0.8.1: https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration? 
I'll be grateful for your help with this issue.
Cheers,
Krzysztof


Reply | Threaded
Open this post in threaded view
|

Re: Working with protobuf wrappers

rmetzger0
Also, we don't add serializers automatically for DataStream programs. I've opened a JIRA for this a while ago.

On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann <[hidden email]> wrote:
Hi Kryzsztof,

it's true that we once added the Protobuf serializer automatically. However, due to versioning conflicts (see https://issues.apache.org/jira/browse/FLINK-1635), we removed it again. Now you have to register the ProtobufSerializer manually: https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program.

Cheers,
Till

On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <[hidden email]> wrote:
Hi! 
I'm trying to use generated Protobuf wrappers compiled with protoc and pass them as objects between functions of Flink. I'm using Flink 0.10.0. 
Unfortunately, I get an exception on runtime: 

[...]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink configuration after fixing this issue in 0.9/0.8.1: https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration? 
I'll be grateful for your help with this issue.
Cheers,
Krzysztof



Reply | Threaded
Open this post in threaded view
|

Re: Working with protobuf wrappers

Flavio Pompermaier
Sorry for the long question but I take advantage of this discussion to ask for something I've never fully understood.. Let's say I have for example a thrift/protobuf/avro object Person.
  1. Do I have really to register a custom serializer? In my code I create a dataset from parquet-thrift but I never had to register anything...Does this change something if I call registerTypeWithKryoSerializer?
  2. How are performance of Flink affected by using one serialization wrt another? For example, is there a simple snippet of a Flink program that show when it's better to the original Person, its POJO version or it's Tuple version (assuming that is a flat object)?
  3. Does this further change when I use Table APIs?

Best,
Flavio

On Tue, Dec 1, 2015 at 10:25 AM, Robert Metzger <[hidden email]> wrote:
Also, we don't add serializers automatically for DataStream programs. I've opened a JIRA for this a while ago.

On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann <[hidden email]> wrote:
Hi Kryzsztof,

it's true that we once added the Protobuf serializer automatically. However, due to versioning conflicts (see https://issues.apache.org/jira/browse/FLINK-1635), we removed it again. Now you have to register the ProtobufSerializer manually: https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program.

Cheers,
Till

On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <[hidden email]> wrote:
Hi! 
I'm trying to use generated Protobuf wrappers compiled with protoc and pass them as objects between functions of Flink. I'm using Flink 0.10.0. 
Unfortunately, I get an exception on runtime: 

[...]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink configuration after fixing this issue in 0.9/0.8.1: https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration? 
I'll be grateful for your help with this issue.
Cheers,
Krzysztof




Reply | Threaded
Open this post in threaded view
|

Re: Working with protobuf wrappers

rmetzger0
Hi Flavio,

1. you don't have to register serializers if its working for you. I would add a custom serializer if its not working or if the performance is poor.
2. I don't think that there is such a performance comparison. Tuples are a little faster than POJOs, other types (serialized with Kryo's standard serializer) are usually slower.
3. There are some plans for the table api to do various optimizations (projection/filter push down), which also have some assumptions about the serializers. So yes, this might change for the table api.


On Tue, Dec 1, 2015 at 11:26 AM, Flavio Pompermaier <[hidden email]> wrote:
Sorry for the long question but I take advantage of this discussion to ask for something I've never fully understood.. Let's say I have for example a thrift/protobuf/avro object Person.
  1. Do I have really to register a custom serializer? In my code I create a dataset from parquet-thrift but I never had to register anything...Does this change something if I call registerTypeWithKryoSerializer?
  2. How are performance of Flink affected by using one serialization wrt another? For example, is there a simple snippet of a Flink program that show when it's better to the original Person, its POJO version or it's Tuple version (assuming that is a flat object)?
  3. Does this further change when I use Table APIs?

Best,
Flavio

On Tue, Dec 1, 2015 at 10:25 AM, Robert Metzger <[hidden email]> wrote:
Also, we don't add serializers automatically for DataStream programs. I've opened a JIRA for this a while ago.

On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann <[hidden email]> wrote:
Hi Kryzsztof,

it's true that we once added the Protobuf serializer automatically. However, due to versioning conflicts (see https://issues.apache.org/jira/browse/FLINK-1635), we removed it again. Now you have to register the ProtobufSerializer manually: https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program.

Cheers,
Till

On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <[hidden email]> wrote:
Hi! 
I'm trying to use generated Protobuf wrappers compiled with protoc and pass them as objects between functions of Flink. I'm using Flink 0.10.0. 
Unfortunately, I get an exception on runtime: 

[...]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink configuration after fixing this issue in 0.9/0.8.1: https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration? 
I'll be grateful for your help with this issue.
Cheers,
Krzysztof





Reply | Threaded
Open this post in threaded view
|

Re: Working with protobuf wrappers

Krzysztof Zarzycki
Thanks guys for your answers, that is exactly information I was looking for. 

Krzysztof

2015-12-01 19:22 GMT+01:00 Robert Metzger <[hidden email]>:
Hi Flavio,

1. you don't have to register serializers if its working for you. I would add a custom serializer if its not working or if the performance is poor.
2. I don't think that there is such a performance comparison. Tuples are a little faster than POJOs, other types (serialized with Kryo's standard serializer) are usually slower.
3. There are some plans for the table api to do various optimizations (projection/filter push down), which also have some assumptions about the serializers. So yes, this might change for the table api.



On Tue, Dec 1, 2015 at 11:26 AM, Flavio Pompermaier <[hidden email]> wrote:
Sorry for the long question but I take advantage of this discussion to ask for something I've never fully understood.. Let's say I have for example a thrift/protobuf/avro object Person.
  1. Do I have really to register a custom serializer? In my code I create a dataset from parquet-thrift but I never had to register anything...Does this change something if I call registerTypeWithKryoSerializer?
  2. How are performance of Flink affected by using one serialization wrt another? For example, is there a simple snippet of a Flink program that show when it's better to the original Person, its POJO version or it's Tuple version (assuming that is a flat object)?
  3. Does this further change when I use Table APIs?

Best,
Flavio

On Tue, Dec 1, 2015 at 10:25 AM, Robert Metzger <[hidden email]> wrote:
Also, we don't add serializers automatically for DataStream programs. I've opened a JIRA for this a while ago.

On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann <[hidden email]> wrote:
Hi Kryzsztof,

it's true that we once added the Protobuf serializer automatically. However, due to versioning conflicts (see https://issues.apache.org/jira/browse/FLINK-1635), we removed it again. Now you have to register the ProtobufSerializer manually: https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program.

Cheers,
Till

On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <[hidden email]> wrote:
Hi! 
I'm trying to use generated Protobuf wrappers compiled with protoc and pass them as objects between functions of Flink. I'm using Flink 0.10.0. 
Unfortunately, I get an exception on runtime: 

[...]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink configuration after fixing this issue in 0.9/0.8.1: https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration? 
I'll be grateful for your help with this issue.
Cheers,
Krzysztof