AvroSerializer

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

AvroSerializer

Debasish Ghosh
Hello -

I am using Avro based encoding with Flink. I see that Flink has an AvroSerializer that gets used for serializing Avro. Is it possible to provide a custom implementation of the serializer e.g. I want to use MyAvroSerializer instead of AvroSerializer in *all* places. Is there any way to register such a custom serializer ?

Reply | Threaded
Open this post in threaded view
|

Re: AvroSerializer

Konstantin Knauf-2
Hi Debasish, 

this should be possible via 

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

You can check that the correct serializer is used with
TypeInformation.of(MyCustomType.class).createSerializer(env.getConfig());
In this case your serializer needs to implements Kryo's serializer interface. Alternatively, you can have a look at @TypeInfo Annotation [1]. 
Cheers, 
Konstantin
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/custom_serializers.html 
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html

On Mon, May 13, 2019 at 6:50 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

I am using Avro based encoding with Flink. I see that Flink has an AvroSerializer that gets used for serializing Avro. Is it possible to provide a custom implementation of the serializer e.g. I want to use MyAvroSerializer instead of AvroSerializer in *all* places. Is there any way to register such a custom serializer ?



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 20. - 21.06.2019



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: AvroSerializer

Debasish Ghosh
Hi Konstantin -

I did take a look at the option you mentioned. Using that option I can register a custom serializer for a custom type. But my requirement is a bit different - I would like to have a custom AvroSerializer for *all* types which implement SpecificRecordBase of Avro. The reason is I would like an avro serializer that bypasses the problems mentioned in https://issues.apache.org/jira/browse/FLINK-12501. I have such a serializer (it's not Kryo serializer though) and I would like to use it in place of AvroSerializer. 

regards.

On Tue, May 14, 2019 at 12:38 PM Konstantin Knauf <[hidden email]> wrote:
Hi Debasish, 

this should be possible via 

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

You can check that the correct serializer is used with
TypeInformation.of(MyCustomType.class).createSerializer(env.getConfig());
In this case your serializer needs to implements Kryo's serializer interface. Alternatively, you can have a look at @TypeInfo Annotation [1]. 
Cheers, 
Konstantin
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/custom_serializers.html 
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html

On Mon, May 13, 2019 at 6:50 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

I am using Avro based encoding with Flink. I see that Flink has an AvroSerializer that gets used for serializing Avro. Is it possible to provide a custom implementation of the serializer e.g. I want to use MyAvroSerializer instead of AvroSerializer in *all* places. Is there any way to register such a custom serializer ?



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 20. - 21.06.2019



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


--
Reply | Threaded
Open this post in threaded view
|

Re: AvroSerializer

Rafi Aroch
Hi Debasish,

It would be a bit tedious, but in order to override the default AvroSerializer you could specify a TypeInformation object where needed.
You would need to implement your own MyAvroTypeInfo instead of the provided AvroTypeInfo.

For example:
env.addSource(kafkaConsumer)
.returns(new MyAvroTypeInfo<>(SomeAvro.class)
This should override the default AvroSerializer.

Or when using some operator:
.window(EventTimeSessionWindows.withGap(joinGap))
.apply(new JoinFunction(), new MyAvroTypeInfo<>(SomeAvro.class));

Another benefit of this approach over the Kryo serializer option is that you would support state migration.

Hope this helps,
Rafi


On Tue, May 14, 2019 at 10:23 AM Debasish Ghosh <[hidden email]> wrote:
Hi Konstantin -

I did take a look at the option you mentioned. Using that option I can register a custom serializer for a custom type. But my requirement is a bit different - I would like to have a custom AvroSerializer for *all* types which implement SpecificRecordBase of Avro. The reason is I would like an avro serializer that bypasses the problems mentioned in https://issues.apache.org/jira/browse/FLINK-12501. I have such a serializer (it's not Kryo serializer though) and I would like to use it in place of AvroSerializer. 

regards.

On Tue, May 14, 2019 at 12:38 PM Konstantin Knauf <[hidden email]> wrote:
Hi Debasish, 

this should be possible via 

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

You can check that the correct serializer is used with
TypeInformation.of(MyCustomType.class).createSerializer(env.getConfig());
In this case your serializer needs to implements Kryo's serializer interface. Alternatively, you can have a look at @TypeInfo Annotation [1]. 
Cheers, 
Konstantin
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/custom_serializers.html 
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html

On Mon, May 13, 2019 at 6:50 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

I am using Avro based encoding with Flink. I see that Flink has an AvroSerializer that gets used for serializing Avro. Is it possible to provide a custom implementation of the serializer e.g. I want to use MyAvroSerializer instead of AvroSerializer in *all* places. Is there any way to register such a custom serializer ?



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 20. - 21.06.2019



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


--
Reply | Threaded
Open this post in threaded view
|

Re: AvroSerializer

Debasish Ghosh
Thanks Rafi .. will try it out .. 

On Tue, 14 May 2019 at 1:26 PM, Rafi Aroch <[hidden email]> wrote:
Hi Debasish,

It would be a bit tedious, but in order to override the default AvroSerializer you could specify a TypeInformation object where needed.
You would need to implement your own MyAvroTypeInfo instead of the provided AvroTypeInfo.

For example:
env.addSource(kafkaConsumer)
.returns(new MyAvroTypeInfo<>(SomeAvro.class)
This should override the default AvroSerializer.

Or when using some operator:
.window(EventTimeSessionWindows.withGap(joinGap))
.apply(new JoinFunction(), new MyAvroTypeInfo<>(SomeAvro.class));

Another benefit of this approach over the Kryo serializer option is that you would support state migration.

Hope this helps,
Rafi


On Tue, May 14, 2019 at 10:23 AM Debasish Ghosh <[hidden email]> wrote:
Hi Konstantin -

I did take a look at the option you mentioned. Using that option I can register a custom serializer for a custom type. But my requirement is a bit different - I would like to have a custom AvroSerializer for *all* types which implement SpecificRecordBase of Avro. The reason is I would like an avro serializer that bypasses the problems mentioned in https://issues.apache.org/jira/browse/FLINK-12501. I have such a serializer (it's not Kryo serializer though) and I would like to use it in place of AvroSerializer. 

regards.

On Tue, May 14, 2019 at 12:38 PM Konstantin Knauf <[hidden email]> wrote:
Hi Debasish, 

this should be possible via 

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

You can check that the correct serializer is used with
TypeInformation.of(MyCustomType.class).createSerializer(env.getConfig());
In this case your serializer needs to implements Kryo's serializer interface. Alternatively, you can have a look at @TypeInfo Annotation [1]. 
Cheers, 
Konstantin
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/custom_serializers.html 
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html

On Mon, May 13, 2019 at 6:50 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

I am using Avro based encoding with Flink. I see that Flink has an AvroSerializer that gets used for serializing Avro. Is it possible to provide a custom implementation of the serializer e.g. I want to use MyAvroSerializer instead of AvroSerializer in *all* places. Is there any way to register such a custom serializer ?



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 20. - 21.06.2019



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


--
--
Sent from my iPhone