How to use AvroDeserializationSchema in flink kafka consumer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to use AvroDeserializationSchema in flink kafka consumer

kkmnmaps
Thanks to Fabian Hueske's (Stack overflow member) help, I am able to run simple Kafka Flink Consumer that receives String messages. Now I am in the process of modifying it to receive binary(structured) data using Avro serialization/deserialization.

I have created a simple .avsc file with the following content:

{                                                                              
    "type": "record",                                                          
    "namespace": "com.mycompany.app",                                          
    "name": "Emp",                                                            
    "fields": [                                                                
        {                                                                      
            "name": "id",                                                      
            "type": "int"                                                      
        },                                                                      
        {                                                                      
            "name": "name",                                                      
            "type": "string"                                                      
        },                                                                      
        {                                                                      
            "name": "salary",                                                    
            "type": "float"                                                      
        }]
}
The avro documentation says, maven should be able to compile this and generate the class files. But it is not clear to me where this file should be placed for the generation to take place. I have run the following command

java -jar avro-tools-1.7.7.jar schema ./emp.avsc .
and it created two files Event.java and Emp.java

When I try to use the AVRO Deserializer with Emp.class as follows

AvroDeserializationSchema<Emp> avroSchema = new AvroDeserializationSchema<Emp>(Emp.class);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired("topic"),
        avroSchema, parameterTool.getProperties()));

I am getting error that there is 'no suitable constructor' for FlinkKafkaConsumer09 with avroSchema as second parameter.

What am I doing wrong? Is there any example code that shows how to use AVRO Deserializer schema? I have downloaded AVRO deserializer from  I have downloaded AvroDeserializationSchema.java from https://gist.github.com/StephanEwen/d515e10dd1c609f70bed.

I appreciate any help.

Thanks
KNM