Hi, I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive
and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink. public
static
void main(String[]
args)
throws Exception { StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties
properties =
new Properties();
properties.setProperty("bootstrap.servers",
"dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
properties.setProperty("zookeeper.connect",
"dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
properties.setProperty("group.id",
"Zeeshantest"); AvroDeserializationSchema<GenericData.Record>
avroSchema =
new AvroDeserializationSchema<>(GenericData.Record.class); FlinkKafkaConsumer08<GenericData.Record>
kafkaConsumer =
new FlinkKafkaConsumer08<>("myavrotopic",
avroSchema,
properties); DataStream<GenericData.Record>
messageStream =
env.addSource(kafkaConsumer);
messageStream.rebalance().print();
env.execute("Flink AVRO KAFKA Test"); } This is the AvroDeserializationSchema that I am using. public
class AvroDeserializationSchema<T>
implements DeserializationSchema<T> {
private
static
final
long
serialVersionUID = 4330538776656642778L;
private
final Class<T>
avroType;
private
transient DatumReader<T>
reader;
private
transient BinaryDecoder
decoder;
public AvroDeserializationSchema(Class<T>
avroType) {
this.avroType
= avroType; }
@Override
public T deserialize(byte[]
message) { ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message,
decoder);
return
reader.read(null,
decoder); }
catch (Exception
e) {
throw
new RuntimeException(e); } }
@Override
public
boolean isEndOfStream(T
nextElement) {
return
false; }
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType); }
private
void ensureInitialized() {
if (reader
== null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType))
{
reader =
new SpecificDatumReader<T>(avroType); }
else {
reader =
new ReflectDatumReader<T>(avroType); } } } } On running this I am getting
java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record. Thanks & Regards Zeeshan Alam
+91
80 6626 5982 +91 7259501608 |
Hi! I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1 Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691 Here is the mail thread: You could try and use the latest release candidate to get the fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html The release is also happening, so should be out in a stable release soon. Greetings, Stephan On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <[hidden email]> wrote:
|
Hi Stephan, I went through one of the old mail thread
http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E
Here it is mentioned that When reading from Kafka you are expected to define a DeserializationSchema. There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add.
I have some questions:
1. As per FLINK-3691 you are adding GenericDatumReader, so I suppose I need to use it instead of DatumReader in my DeserializationSchema which is required to read data from Kafka?
2. What is the recommended way to read AVRO binary data from Kafka if I have the AVRO schema file [*.avsc ] with me? Is there a better more efficient approach? 3. Can AvroInputFormat be used to read Kafka data or DeserializationSchema is a must to read data from Kafka, also AvroInputFormat doesn’t have any javaDoc with it.
Thanks & Regards, Zeeshan Alam
From: Stephan Ewen [mailto:[hidden email]]
Hi! I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1 Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691 Here is the mail thread: You could try and use the latest release candidate to get the fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html The release is also happening, so should be out in a stable release soon. Greetings, Stephan On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <[hidden email]> wrote:
Hi,
I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive
and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.
public
static
void main(String[]
args)
throws Exception {
StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties
properties =
new Properties();
properties.setProperty("bootstrap.servers",
"dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
properties.setProperty("zookeeper.connect",
"dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
properties.setProperty("group.id",
"Zeeshantest");
AvroDeserializationSchema<GenericData.Record>
avroSchema =
new AvroDeserializationSchema<>(GenericData.Record.class);
FlinkKafkaConsumer08<GenericData.Record>
kafkaConsumer =
new FlinkKafkaConsumer08<>("myavrotopic",
avroSchema,
properties);
DataStream<GenericData.Record>
messageStream =
env.addSource(kafkaConsumer);
messageStream.rebalance().print();
env.execute("Flink AVRO KAFKA Test");
}
This is the AvroDeserializationSchema that I am using.
public
class AvroDeserializationSchema<T>
implements DeserializationSchema<T> {
private
static
final
long
serialVersionUID = 4330538776656642778L;
private
final Class<T>
avroType;
private
transient DatumReader<T>
reader;
private
transient BinaryDecoder
decoder;
public AvroDeserializationSchema(Class<T>
avroType) {
this.avroType
= avroType;
}
@Override
public T deserialize(byte[]
message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message,
decoder);
return
reader.read(null,
decoder);
}
catch (Exception
e) {
throw
new RuntimeException(e);
}
}
@Override
public
boolean isEndOfStream(T
nextElement) {
return
false;
}
@Override
public TypeInformation<T> getProducedType()
{
return TypeExtractor.getForClass(avroType);
}
private
void ensureInitialized() {
if (reader
== null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType))
{
reader =
new SpecificDatumReader<T>(avroType);
}
else {
reader =
new ReflectDatumReader<T>(avroType);
}
}
}
}
On running this I am getting
java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.
Thanks & Regards
Zeeshan Alam
|
Hi!
To read data from Kafka, you need a DeserializationSchema. You could create one that wraps the AvroInputFormat, but an AvroDeserializationSchema would simply be an adjustment of the AvroInputFormat to the interface of the DeserializationSchema. In your Avro DeserializationSchema, you can probably create the Avro readers internally with an Avro schema (I believe). Greetings, Stephan On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan <[hidden email]> wrote:
|
Hi Stephan, My
AvroDeserializationSchema worked fine with a different Kafka topic, it seems like the previous Kafka topic was having heterogeneous data with both AVRO and JSON formatted data. Thanks for
your time J. Thanks & Regards Zeeshan Alam From: Stephan Ewen [mailto:[hidden email]]
Hi! To read data from Kafka, you need a DeserializationSchema. You could create one that wraps the AvroInputFormat, but an AvroDeserializationSchema would simply be an adjustment of the AvroInputFormat to the interface
of the DeserializationSchema. In your Avro DeserializationSchema, you can probably create the Avro readers internally with an Avro schema (I believe). Greetings, Stephan On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan <[hidden email]> wrote:
Hi Stephan,
I went through one of the old mail thread
http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E
Here it is mentioned that When reading from Kafka you are expected to define a DeserializationSchema. There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add. I have some questions: 1. As per FLINK-3691 you are adding GenericDatumReader, so I suppose I need to use it instead of DatumReader in my DeserializationSchema which is required to read data from Kafka? 2. What is the recommended way to read AVRO binary data from Kafka if I have the AVRO schema file [*.avsc ] with me? Is there a better more efficient approach? 3. Can AvroInputFormat be used to read Kafka data or DeserializationSchema is a must to read data from Kafka, also AvroInputFormat doesn’t have any javaDoc with it.
Thanks & Regards,
Zeeshan Alam
From: Stephan Ewen [mailto:[hidden email]]
Hi!
I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1
Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691
Here is the mail thread:
You could try and use the latest release candidate to get the fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html
The release is also happening, so should be out in a stable release soon.
Greetings,
Stephan
On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <[hidden email]> wrote:
Hi,
I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive
and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.
public
static
void main(String[]
args)
throws Exception {
StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties
properties =
new Properties();
properties.setProperty("bootstrap.servers",
"dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
properties.setProperty("zookeeper.connect",
"dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
properties.setProperty("group.id",
"Zeeshantest");
AvroDeserializationSchema<GenericData.Record>
avroSchema =
new AvroDeserializationSchema<>(GenericData.Record.class);
FlinkKafkaConsumer08<GenericData.Record>
kafkaConsumer =
new FlinkKafkaConsumer08<>("myavrotopic",
avroSchema,
properties);
DataStream<GenericData.Record>
messageStream =
env.addSource(kafkaConsumer);
messageStream.rebalance().print();
env.execute("Flink AVRO KAFKA Test");
}
This is the AvroDeserializationSchema that I am using.
public
class AvroDeserializationSchema<T>
implements DeserializationSchema<T> {
private
static
final
long
serialVersionUID = 4330538776656642778L;
private
final Class<T>
avroType;
private
transient DatumReader<T>
reader;
private
transient BinaryDecoder
decoder;
public AvroDeserializationSchema(Class<T>
avroType) {
this.avroType
= avroType;
}
@Override
public T deserialize(byte[]
message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message,
decoder);
return
reader.read(null,
decoder);
}
catch (Exception
e) {
throw
new RuntimeException(e);
}
}
@Override
public
boolean isEndOfStream(T
nextElement) {
return
false;
}
@Override
public TypeInformation<T> getProducedType()
{
return TypeExtractor.getForClass(avroType);
}
private
void ensureInitialized() {
if (reader
== null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType))
{
reader =
new SpecificDatumReader<T>(avroType);
}
else {
reader =
new ReflectDatumReader<T>(avroType);
}
}
}
}
On running this I am getting
java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.
Thanks & Regards
Zeeshan Alam
|
Free forum by Nabble | Edit this page |