Need a working example to read/write avro data using FlinkKafkaProducer / Consumer

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Need a working example to read/write avro data using FlinkKafkaProducer / Consumer

kaniska Mandal
I am facing some issues while reading / writing Avro data.

Attached here the corresponding files and avro-generated pojo.

Any clues whats wrong here ?   May be missing some simple step !

A) << producer >> BeamKafkaFlinkAvroProducerTest 

>> if I use  KafkaProducer directly (i.e. call produceSimpleData.. ) , things are working fine   (just for testing )

>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should do)

(i.e. I call produceAvroData2.. )  with the following steps ...

1) First, if I use >> AvroSerializationSchema schema = new AvroSerializationSchema(Test.class); 

i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter ; I face following error >>

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:580)

at org.apache.avro.generic.GenericData.getField(GenericData.java:595)

at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)

at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)


2) Next,  if I use TypeInformationSerializationSchema (irrespective of AvroCoder in Pipeline) , things apparently work fine 

.. as Kafka test consumer tool prints the message  >>   java.lang.String{"uname": "Joe", "id": 6}


B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest

>> I understand we should either use TypeInformationSerializationSchema in both consumer and producer OR

should use AvroDeserializationSchema and AvroSerializationSchema in Consumer and Producer respectively !!

But, irrespective of using AvroDeserializationSchema or TypeInformationSerializationSchema, I get the following exception >>

Exception in thread "main" java.lang.NullPointerException: null value in entry: V=null

at com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)

at com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)

at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)

at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)

at org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)


Thanks

Kaniska


BeamKafkaFlinkAvroConsumerTest.java (7K) Download Attachment
BeamKafkaFlinkAvroProducerTest.java (10K) Download Attachment
User.java (1K) Download Attachment
Test.java (8K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Need a working example to read/write avro data using FlinkKafkaProducer / Consumer

Maximilian Michels
Hi Kaniska,

I've replied to your mail on the Beam user mailing list.

Cheers,
Max

On Wed, Apr 27, 2016 at 4:56 PM, kaniska Mandal
<[hidden email]> wrote:

> I am facing some issues while reading / writing Avro data.
>
> Attached here the corresponding files and avro-generated pojo.
>
> Any clues whats wrong here ?   May be missing some simple step !
>
> A) << producer >> BeamKafkaFlinkAvroProducerTest
>
>>> if I use  KafkaProducer directly (i.e. call produceSimpleData.. ) ,
>>> things are working fine   (just for testing )
>
>>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should do)
>
> (i.e. I call produceAvroData2.. )  with the following steps ...
>
> 1) First, if I use >> AvroSerializationSchema schema = new
> AvroSerializationSchema(Test.class);
>
> i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter ;
> I face following error >>
>
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.avro.generic.IndexedRecord
>
> at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>
> at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>
>
> 2) Next,  if I use TypeInformationSerializationSchema (irrespective of
> AvroCoder in Pipeline) , things apparently work fine
>
> .. as Kafka test consumer tool prints the message  >>
> java.lang.String{"uname": "Joe", "id": 6}
>
>
> B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest
>
>>> I understand we should either use TypeInformationSerializationSchema in
>>> both consumer and producer OR
>
> should use AvroDeserializationSchema and AvroSerializationSchema in Consumer
> and Producer respectively !!
>
> But, irrespective of using AvroDeserializationSchema or
> TypeInformationSerializationSchema, I get the following exception >>
>
> Exception in thread "main" java.lang.NullPointerException: null value in
> entry: V=null
>
> at
> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
>
> at
> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
>
> at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
>
> at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
>
> at
> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
>
>
> Thanks
>
> Kaniska