What is the recommended way to read AVRO data from Kafka using flink.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

What is the recommended way to read AVRO data from Kafka using flink.

Alam, Zeeshan

Hi,

 

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.

 

public static void main(String[] args) throws Exception {

              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

              Properties properties = new Properties();

              properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");

              properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");

              properties.setProperty("group.id", "Zeeshantest");

              AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class);

              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

              DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer);

              messageStream.rebalance().print();

              env.execute("Flink AVRO KAFKA Test");

       }

 

This is the AvroDeserializationSchema that I am using.

 

 

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

 

       private static final long serialVersionUID = 4330538776656642778L;

 

       private final Class<T> avroType;

       private transient DatumReader<T> reader;

       private transient BinaryDecoder decoder;

 

       public AvroDeserializationSchema(Class<T> avroType) {

              this.avroType = avroType;

       }

 

       @Override

       public T deserialize(byte[] message) {

              ensureInitialized();

              try {

                     decoder = DecoderFactory.get().binaryDecoder(message, decoder);

                     return reader.read(null, decoder);

              } catch (Exception e) {

                     throw new RuntimeException(e);

              }

       }

 

       @Override

       public boolean isEndOfStream(T nextElement) {

              return false;

       }

 

       @Override

       public TypeInformation<T> getProducedType() {

              return TypeExtractor.getForClass(avroType);

       }

 

       private void ensureInitialized() {

              if (reader == null) {

                     if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {

                           reader = new SpecificDatumReader<T>(avroType);

                     } else {

                           reader = new ReflectDatumReader<T>(avroType);

                     }

              }

       }

}

 

On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.

 

Thanks & Regards

Zeeshan Alam

cid:image001.jpg@01CFC06C.80406AE0

cid:image002.jpg@01CFC2B0.B0315750 +91 80 6626 5982  cid:image003.jpg@01CFC2B0.B0315750 +91 7259501608

Fidelity Internal Information

 

Techworks Monitoring link

 

 

Reply | Threaded
Open this post in threaded view
|

Re: What is the recommended way to read AVRO data from Kafka using flink.

Stephan Ewen
Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1


Here is the mail thread:


The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <[hidden email]> wrote:

Hi,

 

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.

 

public static void main(String[] args) throws Exception {

              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

              Properties properties = new Properties();

              properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");

              properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");

              properties.setProperty("group.id", "Zeeshantest");

              AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class);

              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

              DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer);

              messageStream.rebalance().print();

              env.execute("Flink AVRO KAFKA Test");

       }

 

This is the AvroDeserializationSchema that I am using.

 

 

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

 

       private static final long serialVersionUID = 4330538776656642778L;

 

       private final Class<T> avroType;

       private transient DatumReader<T> reader;

       private transient BinaryDecoder decoder;

 

       public AvroDeserializationSchema(Class<T> avroType) {

              this.avroType = avroType;

       }

 

       @Override

       public T deserialize(byte[] message) {

              ensureInitialized();

              try {

                     decoder = DecoderFactory.get().binaryDecoder(message, decoder);

                     return reader.read(null, decoder);

              } catch (Exception e) {

                     throw new RuntimeException(e);

              }

       }

 

       @Override

       public boolean isEndOfStream(T nextElement) {

              return false;

       }

 

       @Override

       public TypeInformation<T> getProducedType() {

              return TypeExtractor.getForClass(avroType);

       }

 

       private void ensureInitialized() {

              if (reader == null) {

                     if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {

                           reader = new SpecificDatumReader<T>(avroType);

                     } else {

                           reader = new ReflectDatumReader<T>(avroType);

                     }

              }

       }

}

 

On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.

 

Thanks & Regards

Zeeshan Alam

cid:image001.jpg@01CFC06C.80406AE0

cid:image002.jpg@01CFC2B0.B0315750 +91 80 6626 5982  cid:image003.jpg@01CFC2B0.B0315750 <a href="tel:%2B91%207259501608" value="+917259501608" target="_blank">+91 7259501608

Fidelity Internal Information

 

Techworks Monitoring link

 

 


Reply | Threaded
Open this post in threaded view
|

RE: What is the recommended way to read AVRO data from Kafka using flink.

Alam, Zeeshan

Hi Stephan,

 

I went through one of the old mail thread http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E

 

Here it is mentioned that  When reading from Kafka you are expected to define a DeserializationSchema. There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add.
 
I have some questions:
 
1.       As per FLINK-3691  you are adding GenericDatumReader, so I suppose I need to use it instead of DatumReader in my  DeserializationSchema which is required to read data from Kafka?
 
2.  What is the recommended way to read AVRO binary data from Kafka if I  have the AVRO schema file [*.avsc ] with me? Is there a better more efficient approach?

 

3.       Can AvroInputFormat be used to read Kafka data or DeserializationSchema is a must to read data from Kafka, also AvroInputFormat doesn’t have any javaDoc with it.
 

 

 

Thanks & Regards,

Zeeshan Alam

 

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Tuesday, August 02, 2016 7:52 PM
To: [hidden email]
Subject: Re: What is the recommended way to read AVRO data from Kafka using flink.

 

Hi!

 

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

 

 

Here is the mail thread:

 

 

The release is also happening, so should be out in a stable release soon.

 

Greetings,

Stephan

 

 

On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <[hidden email]> wrote:

Hi,

 

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.

 

public static void main(String[] args) throws Exception {

              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

              Properties properties = new Properties();

              properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");

              properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");

              properties.setProperty("group.id", "Zeeshantest");

              AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class);

              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

              DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer);

              messageStream.rebalance().print();

              env.execute("Flink AVRO KAFKA Test");

       }

 

This is the AvroDeserializationSchema that I am using.

 

 

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

 

       private static final long serialVersionUID = 4330538776656642778L;

 

       private final Class<T> avroType;

       private transient DatumReader<T> reader;

       private transient BinaryDecoder decoder;

 

       public AvroDeserializationSchema(Class<T> avroType) {

              this.avroType = avroType;

       }

 

       @Override

       public T deserialize(byte[] message) {

              ensureInitialized();

              try {

                     decoder = DecoderFactory.get().binaryDecoder(message, decoder);

                     return reader.read(null, decoder);

              } catch (Exception e) {

                     throw new RuntimeException(e);

              }

       }

 

       @Override

       public boolean isEndOfStream(T nextElement) {

              return false;

       }

 

       @Override

       public TypeInformation<T> getProducedType() {

              return TypeExtractor.getForClass(avroType);

       }

 

       private void ensureInitialized() {

              if (reader == null) {

                     if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {

                           reader = new SpecificDatumReader<T>(avroType);

                     } else {

                           reader = new ReflectDatumReader<T>(avroType);

                     }

              }

       }

}

 

On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.

 

Thanks & Regards

Zeeshan Alam

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: What is the recommended way to read AVRO data from Kafka using flink.

Stephan Ewen
Hi!

To read data from Kafka, you need a DeserializationSchema. You could create one that wraps the AvroInputFormat, but an AvroDeserializationSchema would simply be an adjustment of the AvroInputFormat to the interface of the DeserializationSchema.

In your Avro DeserializationSchema, you can probably create the Avro readers internally with an Avro schema (I believe).

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan <[hidden email]> wrote:

Hi Stephan,

 

I went through one of the old mail thread http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E

 

Here it is mentioned that  When reading from Kafka you are expected to define a DeserializationSchema. There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add.
 
I have some questions:
 
1.       As per FLINK-3691  you are adding GenericDatumReader, so I suppose I need to use it instead of DatumReader in my  DeserializationSchema which is required to read data from Kafka?
 
2.  What is the recommended way to read AVRO binary data from Kafka if I  have the AVRO schema file [*.avsc ] with me? Is there a better more efficient approach?

 

3.       Can AvroInputFormat be used to read Kafka data or DeserializationSchema is a must to read data from Kafka, also AvroInputFormat doesn’t have any javaDoc with it.
 

 

 

Thanks & Regards,

Zeeshan Alam

 

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Tuesday, August 02, 2016 7:52 PM
To: [hidden email]
Subject: Re: What is the recommended way to read AVRO data from Kafka using flink.

 

Hi!

 

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

 

 

Here is the mail thread:

 

 

The release is also happening, so should be out in a stable release soon.

 

Greetings,

Stephan

 

 

On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <[hidden email]> wrote:

Hi,

 

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.

 

public static void main(String[] args) throws Exception {

              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

              Properties properties = new Properties();

              properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");

              properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");

              properties.setProperty("group.id", "Zeeshantest");

              AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class);

              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

              DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer);

              messageStream.rebalance().print();

              env.execute("Flink AVRO KAFKA Test");

       }

 

This is the AvroDeserializationSchema that I am using.

 

 

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

 

       private static final long serialVersionUID = 4330538776656642778L;

 

       private final Class<T> avroType;

       private transient DatumReader<T> reader;

       private transient BinaryDecoder decoder;

 

       public AvroDeserializationSchema(Class<T> avroType) {

              this.avroType = avroType;

       }

 

       @Override

       public T deserialize(byte[] message) {

              ensureInitialized();

              try {

                     decoder = DecoderFactory.get().binaryDecoder(message, decoder);

                     return reader.read(null, decoder);

              } catch (Exception e) {

                     throw new RuntimeException(e);

              }

       }

 

       @Override

       public boolean isEndOfStream(T nextElement) {

              return false;

       }

 

       @Override

       public TypeInformation<T> getProducedType() {

              return TypeExtractor.getForClass(avroType);

       }

 

       private void ensureInitialized() {

              if (reader == null) {

                     if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {

                           reader = new SpecificDatumReader<T>(avroType);

                     } else {

                           reader = new ReflectDatumReader<T>(avroType);

                     }

              }

       }

}

 

On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.

 

Thanks & Regards

Zeeshan Alam

 

 

 


Reply | Threaded
Open this post in threaded view
|

RE: What is the recommended way to read AVRO data from Kafka using flink.

Alam, Zeeshan

Hi Stephan,

 

My AvroDeserializationSchema worked fine with a different Kafka topic, it seems like the previous Kafka topic was having heterogeneous data with both AVRO and JSON formatted data. Thanks for your time J.

 

Thanks & Regards

Zeeshan Alam

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, August 04, 2016 6:00 PM
To: [hidden email]
Subject: Re: What is the recommended way to read AVRO data from Kafka using flink.

 

Hi!

 

To read data from Kafka, you need a DeserializationSchema. You could create one that wraps the AvroInputFormat, but an AvroDeserializationSchema would simply be an adjustment of the AvroInputFormat to the interface of the DeserializationSchema.

 

In your Avro DeserializationSchema, you can probably create the Avro readers internally with an Avro schema (I believe).

 

Greetings,

Stephan

 

 

On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan <[hidden email]> wrote:

Hi Stephan,

 

I went through one of the old mail thread http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E

 

Here it is mentioned that  When reading from Kafka you are expected to define a DeserializationSchema. There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add.
 
I have some questions:
 
1.       As per FLINK-3691  you are adding GenericDatumReader, so I suppose I need to use it instead of DatumReader in my  DeserializationSchema which is required to read data from Kafka?
 
2.  What is the recommended way to read AVRO binary data from Kafka if I  have the AVRO schema file [*.avsc ] with me? Is there a better more efficient approach?

 

3.       Can AvroInputFormat be used to read Kafka data or DeserializationSchema is a must to read data from Kafka, also AvroInputFormat doesn’t have any javaDoc with it.
 

 

 

Thanks & Regards,

Zeeshan Alam

 

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Tuesday, August 02, 2016 7:52 PM
To: [hidden email]
Subject: Re: What is the recommended way to read AVRO data from Kafka using flink.

 

Hi!

 

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

 

 

Here is the mail thread:

 

 

The release is also happening, so should be out in a stable release soon.

 

Greetings,

Stephan

 

 

On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <[hidden email]> wrote:

Hi,

 

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.

 

public static void main(String[] args) throws Exception {

              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

              Properties properties = new Properties();

              properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");

              properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");

              properties.setProperty("group.id", "Zeeshantest");

              AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class);

              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

              DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer);

              messageStream.rebalance().print();

              env.execute("Flink AVRO KAFKA Test");

       }

 

This is the AvroDeserializationSchema that I am using.

 

 

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

 

       private static final long serialVersionUID = 4330538776656642778L;

 

       private final Class<T> avroType;

       private transient DatumReader<T> reader;

       private transient BinaryDecoder decoder;

 

       public AvroDeserializationSchema(Class<T> avroType) {

              this.avroType = avroType;

       }

 

       @Override

       public T deserialize(byte[] message) {

              ensureInitialized();

              try {

                     decoder = DecoderFactory.get().binaryDecoder(message, decoder);

                     return reader.read(null, decoder);

              } catch (Exception e) {

                     throw new RuntimeException(e);

              }

       }

 

       @Override

       public boolean isEndOfStream(T nextElement) {

              return false;

       }

 

       @Override

       public TypeInformation<T> getProducedType() {

              return TypeExtractor.getForClass(avroType);

       }

 

       private void ensureInitialized() {

              if (reader == null) {

                     if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {

                           reader = new SpecificDatumReader<T>(avroType);

                     } else {

                           reader = new ReflectDatumReader<T>(avroType);

                     }

              }

       }

}

 

On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.

 

Thanks & Regards

Zeeshan Alam