Hello -
I am using Avro based encoding with Flink. I see that Flink has an AvroSerializer that gets used for serializing Avro. Is it possible to provide a custom implementation of the serializer e.g. I want to use MyAvroSerializer instead of AvroSerializer in *all* places. Is there any way to register such a custom serializer ? |
Hi Debasish, this should be possible via env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class); You can check that the correct serializer is used with TypeInformation.of(MyCustomType.class).createSerializer(env.getConfig()); In this case your serializer needs to implements Kryo's serializer interface. Alternatively, you can have a look at @TypeInfo Annotation [1]. Cheers, Konstantin [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/custom_serializers.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html On Mon, May 13, 2019 at 6:50 PM Debasish Ghosh <[hidden email]> wrote:
Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Hi Konstantin - I did take a look at the option you mentioned. Using that option I can register a custom serializer for a custom type. But my requirement is a bit different - I would like to have a custom AvroSerializer for *all* types which implement SpecificRecordBase of Avro. The reason is I would like an avro serializer that bypasses the problems mentioned in https://issues.apache.org/jira/browse/FLINK-12501. I have such a serializer (it's not Kryo serializer though) and I would like to use it in place of AvroSerializer. regards. On Tue, May 14, 2019 at 12:38 PM Konstantin Knauf <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Hi Debasish, It would be a bit tedious, but in order to override the default AvroSerializer you could specify a TypeInformation object where needed. You would need to implement your own MyAvroTypeInfo instead of the provided AvroTypeInfo. For example: env.addSource(kafkaConsumer) This should override the default AvroSerializer. Or when using some operator: .window(EventTimeSessionWindows.withGap(joinGap)) Another benefit of this approach over the Kryo serializer option is that you would support state migration. Hope this helps, Rafi On Tue, May 14, 2019 at 10:23 AM Debasish Ghosh <[hidden email]> wrote:
|
Thanks Rafi .. will try it out .. On Tue, 14 May 2019 at 1:26 PM, Rafi Aroch <[hidden email]> wrote:
Sent from my iPhone
|
Free forum by Nabble | Edit this page |