Re: Kafka + confluent schema registry Avro parsing
Posted by
Maximilian Michels on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Kafka-confluent-schema-registry-Avro-parsing-tp3578p3580.html
Hi Madhukar,
Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.
Like so:
public class MyAvroDeserializer implements DeserializationSchema<MyType> {
private KafkaAvroDecoder decoder;
public MyAvroDeserializer() {
SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...);
this.decoder = new KafkaAvroDecoder(schemaRegistry);
}
public MyType deserialize(byte[] message) throws Exception {
return (MyType) this.decoder.fromBytes(messages);
}
public boolean isEndOfStream(MyType nextElement) {
return false;
}
}
Then you supply this class when creating the consumer:
DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType);
Let me know if that works for you.
Best regards,
Max