Re: Kafka + confluent schema registry Avro parsing

Posted by Maximilian Michels on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Kafka-confluent-schema-registry-Avro-parsing-tp3578p3580.html

Hi Madhukar,

Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.

Like so:

public class MyAvroDeserializer implements DeserializationSchema<MyType> {

    private KafkaAvroDecoder decoder;

    public MyAvroDeserializer() {
         SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...);
         this.decoder = new KafkaAvroDecoder(schemaRegistry);
    }

    public MyType deserialize(byte[] message) throws Exception {
         return (MyType) this.decoder.fromBytes(messages);
    }

    public boolean isEndOfStream(MyType nextElement) {
         return false;
    }

}

Then you supply this class when creating the consumer:

DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType);



Let me know if that works for you.

Best regards,
Max

On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <[hidden email]> wrote:
Hi 

I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. 

I am looking for an example on how to deserialize Avro message where schema is stored in schema registry.

Any help is appreciated. Thanks in Advance. 

Thanks,
Madhu