Providing Custom Serializer for Generic Type

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Providing Custom Serializer for Generic Type

Andrea Spina
Dear community,
in my job, I run with a custom event type MyClass which is a sort of "generic event" that I handle all along my streaming flow both as an event (DataStream[MyClass]) and as a managed state.

I see that Flink warns me about generic serialization of MyClass

 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$$schema
 INFO [run-main-0] (TypeExtractor.java:1857) - Class class io.radicalbit.MyClass cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$schema


So that I wanted to provide my custom serializer for MyClass, trying first to register the Java one to check if the system recognizes it so I followed [1] but it seems that it is not considered.

I read then about [2] (the case is way akin to mine) and AFAIU I need to implement a custom TypeInformation and TypeSerializer for my class as suggested in [3] because Flink will ignore my registered serializer as long as it considers my type as generic.

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])

My question finally is: Do I need to provide this custom classes? Is there any practical example for creating custom information like the above mentioned? I have had a quick preliminary look at it but seems that I need to provide a non-trivial amount of information to TypeInformation and TypeSerializer interfaces.

Thank you for your excellent work and help.

Cheers. 

[1] - https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
[2] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
[3] - https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: Providing Custom Serializer for Generic Type

JingsongLee
Hi Andrea:
Why not make your MyClass POJO? [1] If it is a POJO, then flink 
will use PojoTypeInfo and PojoSerializer that have a good 
implementation already.


Best, JingsongLee

------------------------------------------------------------------
From:Andrea Spina <[hidden email]>
Send Time:2019年7月4日(星期四) 14:37
To:user <[hidden email]>
Subject:Providing Custom Serializer for Generic Type

Dear community,
in my job, I run with a custom event type MyClass which is a sort of "generic event" that I handle all along my streaming flow both as an event (DataStream[MyClass]) and as a managed state.

I see that Flink warns me about generic serialization of MyClass

 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$$schema
 INFO [run-main-0] (TypeExtractor.java:1857) - Class class io.radicalbit.MyClass cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$schema


So that I wanted to provide my custom serializer for MyClass, trying first to register the Java one to check if the system recognizes it so I followed [1] but it seems that it is not considered.

I read then about [2] (the case is way akin to mine) and AFAIU I need to implement a custom TypeInformation and TypeSerializer for my class as suggested in [3] because Flink will ignore my registered serializer as long as it considers my type as generic.

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])

My question finally is: Do I need to provide this custom classes? Is there any practical example for creating custom information like the above mentioned? I have had a quick preliminary look at it but seems that I need to provide a non-trivial amount of information to TypeInformation and TypeSerializer interfaces.

Thank you for your excellent work and help.

Cheers. 

[1] - https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
[2] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
[3] - https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Reply | Threaded
Open this post in threaded view
|

Re: Providing Custom Serializer for Generic Type

Andrea Spina
Hi JingsongLee, thank you for your answer.
I wanted to explore it as the last chance honestly. Anyway if defining custom serializers and types information involves quite a big effort, I would reconsider my guess. 

Cheers,

Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee <[hidden email]> ha scritto:
Hi Andrea:
Why not make your MyClass POJO? [1] If it is a POJO, then flink 
will use PojoTypeInfo and PojoSerializer that have a good 
implementation already.


Best, JingsongLee

------------------------------------------------------------------
From:Andrea Spina <[hidden email]>
Send Time:2019年7月4日(星期四) 14:37
To:user <[hidden email]>
Subject:Providing Custom Serializer for Generic Type

Dear community,
in my job, I run with a custom event type MyClass which is a sort of "generic event" that I handle all along my streaming flow both as an event (DataStream[MyClass]) and as a managed state.

I see that Flink warns me about generic serialization of MyClass

 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$$schema
 INFO [run-main-0] (TypeExtractor.java:1857) - Class class io.radicalbit.MyClass cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$schema


So that I wanted to provide my custom serializer for MyClass, trying first to register the Java one to check if the system recognizes it so I followed [1] but it seems that it is not considered.

I read then about [2] (the case is way akin to mine) and AFAIU I need to implement a custom TypeInformation and TypeSerializer for my class as suggested in [3] because Flink will ignore my registered serializer as long as it considers my type as generic.

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])

My question finally is: Do I need to provide this custom classes? Is there any practical example for creating custom information like the above mentioned? I have had a quick preliminary look at it but seems that I need to provide a non-trivial amount of information to TypeInformation and TypeSerializer interfaces.

Thank you for your excellent work and help.

Cheers. 

[1] - https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
[2] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
[3] - https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT



--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: Providing Custom Serializer for Generic Type

Tzu-Li (Gordon) Tai
Hi Andrea,

Is there a specific reason you want to use a custom TypeInformation / TypeSerializer for your type?
From the description in the original post, this part wasn't clear to me.

If the only reason is because it is generally suggested to avoid generic type serialization via Kryo, both for performance reasons as well as evolvability in the future, then updating your type to be recognized by Flink as one of the supported types [1] would be enough.
Otherwise, implementing your own type information and serializer is usually only something users with very specific use cases might be required to do.
Since you are also using that type as managed state, for a safer schema evolvability story in the future, I would recommend either Avro or Pojo as Jingsong Lee had already mentioned.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#flinks-typeinformation-class

On Thu, Jul 4, 2019 at 5:08 PM Andrea Spina <[hidden email]> wrote:
Hi JingsongLee, thank you for your answer.
I wanted to explore it as the last chance honestly. Anyway if defining custom serializers and types information involves quite a big effort, I would reconsider my guess. 

Cheers,

Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee <[hidden email]> ha scritto:
Hi Andrea:
Why not make your MyClass POJO? [1] If it is a POJO, then flink 
will use PojoTypeInfo and PojoSerializer that have a good 
implementation already.


Best, JingsongLee

------------------------------------------------------------------
From:Andrea Spina <[hidden email]>
Send Time:2019年7月4日(星期四) 14:37
To:user <[hidden email]>
Subject:Providing Custom Serializer for Generic Type

Dear community,
in my job, I run with a custom event type MyClass which is a sort of "generic event" that I handle all along my streaming flow both as an event (DataStream[MyClass]) and as a managed state.

I see that Flink warns me about generic serialization of MyClass

 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$$schema
 INFO [run-main-0] (TypeExtractor.java:1857) - Class class io.radicalbit.MyClass cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$schema


So that I wanted to provide my custom serializer for MyClass, trying first to register the Java one to check if the system recognizes it so I followed [1] but it seems that it is not considered.

I read then about [2] (the case is way akin to mine) and AFAIU I need to implement a custom TypeInformation and TypeSerializer for my class as suggested in [3] because Flink will ignore my registered serializer as long as it considers my type as generic.

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])

My question finally is: Do I need to provide this custom classes? Is there any practical example for creating custom information like the above mentioned? I have had a quick preliminary look at it but seems that I need to provide a non-trivial amount of information to TypeInformation and TypeSerializer interfaces.

Thank you for your excellent work and help.

Cheers. 

[1] - https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
[2] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
[3] - https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT



--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: Providing Custom Serializer for Generic Type

Andrea Spina
Hi Gordon, thank you. 
The involved data structure is a complex abstraction owning a schema and values, it declares private fields which should not be edited directly from users. I'd say it's really akin to an Avro GenericRecord. How would you approach the problem if you have to serialize/deserialize efficiently an Avro GenericRecord? I think it cannot be a POJO and ser/de using avro brings so much overhead described also at [1].

Thank you really much for your help.

Andrea

[1] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html

Il giorno gio 4 lug 2019 alle ore 11:23 Tzu-Li (Gordon) Tai <[hidden email]> ha scritto:
Hi Andrea,

Is there a specific reason you want to use a custom TypeInformation / TypeSerializer for your type?
From the description in the original post, this part wasn't clear to me.

If the only reason is because it is generally suggested to avoid generic type serialization via Kryo, both for performance reasons as well as evolvability in the future, then updating your type to be recognized by Flink as one of the supported types [1] would be enough.
Otherwise, implementing your own type information and serializer is usually only something users with very specific use cases might be required to do.
Since you are also using that type as managed state, for a safer schema evolvability story in the future, I would recommend either Avro or Pojo as Jingsong Lee had already mentioned.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#flinks-typeinformation-class

On Thu, Jul 4, 2019 at 5:08 PM Andrea Spina <[hidden email]> wrote:
Hi JingsongLee, thank you for your answer.
I wanted to explore it as the last chance honestly. Anyway if defining custom serializers and types information involves quite a big effort, I would reconsider my guess. 

Cheers,

Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee <[hidden email]> ha scritto:
Hi Andrea:
Why not make your MyClass POJO? [1] If it is a POJO, then flink 
will use PojoTypeInfo and PojoSerializer that have a good 
implementation already.


Best, JingsongLee

------------------------------------------------------------------
From:Andrea Spina <[hidden email]>
Send Time:2019年7月4日(星期四) 14:37
To:user <[hidden email]>
Subject:Providing Custom Serializer for Generic Type

Dear community,
in my job, I run with a custom event type MyClass which is a sort of "generic event" that I handle all along my streaming flow both as an event (DataStream[MyClass]) and as a managed state.

I see that Flink warns me about generic serialization of MyClass

 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$$schema
 INFO [run-main-0] (TypeExtractor.java:1857) - Class class io.radicalbit.MyClass cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$schema


So that I wanted to provide my custom serializer for MyClass, trying first to register the Java one to check if the system recognizes it so I followed [1] but it seems that it is not considered.

I read then about [2] (the case is way akin to mine) and AFAIU I need to implement a custom TypeInformation and TypeSerializer for my class as suggested in [3] because Flink will ignore my registered serializer as long as it considers my type as generic.

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])

My question finally is: Do I need to provide this custom classes? Is there any practical example for creating custom information like the above mentioned? I have had a quick preliminary look at it but seems that I need to provide a non-trivial amount of information to TypeInformation and TypeSerializer interfaces.

Thank you for your excellent work and help.

Cheers. 

[1] - https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
[2] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
[3] - https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT



--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT