Kryo Serializator to override AVRO default

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Kryo Serializator to override AVRO default

Enrico Agnoli

Hi,

 

I have a dataflow with a Kafka source that uses the avro generated class to deserialize:

    AvroDeserializer<MyAvroType> avroSchema = new AvroDeserializer<>( MyAvroType.class);

    FlinkKafkaConsumer011<MyAvroType> kafkaReader = new FlinkKafkaConsumer011<myAvroType>( "kafkaTopic", avroSchema, properties);

 

From this point on I would like that Flink uses a specific Serializer for that type, so I define:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // env.getConfig().registerTypeWithKryoSerializer(MyAvroType.class, AvroDataEncryptionSerializer.class);
    env.getConfig().addDefaultKryoSerializer(MyAvroType.class, AvroDataEncryptionSerializer.class);
    env.getConfig().enableForceKryo();

 

MyAvroType > is the class generated by Avro

AvroDeserializer > Is a simple AvroDeserializer that implements org.apache.flink.api.common.serialization.DeserializationSchema

AvroDataEncryptionSerializer > Is a custom serializer that extends com.esotericsoftware.kryo.Serializer<MyAvroType>

 

however I see that each time the object needs to be serialized (e.g.: via State snapshot) the org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize is invoked.

 

Am I doing something wrong?

 

Thanks,

-Enrico