Hi,
I have a dataflow with a Kafka source that uses the avro generated class to deserialize:
AvroDeserializer<MyAvroType> avroSchema =
new AvroDeserializer<>( MyAvroType.class);
FlinkKafkaConsumer011<MyAvroType> kafkaReader =
new FlinkKafkaConsumer011<myAvroType>( "kafkaTopic", avroSchema, properties);
From this point on I would like that Flink uses a specific Serializer for that type, so I define:
final
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.getConfig().registerTypeWithKryoSerializer(MyAvroType.class, AvroDataEncryptionSerializer.class);
env.getConfig().addDefaultKryoSerializer(MyAvroType.class, AvroDataEncryptionSerializer.class);
env.getConfig().enableForceKryo();
MyAvroType > is the class generated by Avro
AvroDeserializer > Is a simple AvroDeserializer that implements
org.apache.flink.api.common.serialization.DeserializationSchema
AvroDataEncryptionSerializer > Is a custom serializer that extends
com.esotericsoftware.kryo.Serializer<MyAvroType>
however I see that each time the object needs to be serialized (e.g.: via State snapshot) the
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize is invoked.
Am I doing something wrong?
Thanks,
-Enrico