I am facing some issues while reading / writing Avro data. Attached here the corresponding files and avro-generated pojo. Any clues whats wrong here ? May be missing some simple step ! A) << producer >> BeamKafkaFlinkAvroProducerTest >> if I use KafkaProducer directly (i.e. call produceSimpleData.. ) , things are working fine (just for testing ) >> Using FlinkKafkaProducer as UnboundedSource (this is what I should do) (i.e. I call produceAvroData2.. ) with the following steps ... 1) First, if I use >> AvroSerializationSchema schema = new AvroSerializationSchema(Test.class); i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter ; I face following error >>
2) Next, if I use TypeInformationSerializationSchema (irrespective of AvroCoder in Pipeline) , things apparently work fine .. as Kafka test consumer tool prints the message >> java.lang.String{"uname": "Joe", "id": 6} B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest >> I understand we should either use TypeInformationSerializationSchema in both consumer and producer OR should use AvroDeserializationSchema and AvroSerializationSchema in Consumer and Producer respectively !! But, irrespective of using AvroDeserializationSchema or TypeInformationSerializationSchema, I get the following exception >>
Thanks Kaniska BeamKafkaFlinkAvroConsumerTest.java (7K) Download Attachment BeamKafkaFlinkAvroProducerTest.java (10K) Download Attachment User.java (1K) Download Attachment Test.java (8K) Download Attachment |
Hi Kaniska,
I've replied to your mail on the Beam user mailing list. Cheers, Max On Wed, Apr 27, 2016 at 4:56 PM, kaniska Mandal <[hidden email]> wrote: > I am facing some issues while reading / writing Avro data. > > Attached here the corresponding files and avro-generated pojo. > > Any clues whats wrong here ? May be missing some simple step ! > > A) << producer >> BeamKafkaFlinkAvroProducerTest > >>> if I use KafkaProducer directly (i.e. call produceSimpleData.. ) , >>> things are working fine (just for testing ) > >>> Using FlinkKafkaProducer as UnboundedSource (this is what I should do) > > (i.e. I call produceAvroData2.. ) with the following steps ... > > 1) First, if I use >> AvroSerializationSchema schema = new > AvroSerializationSchema(Test.class); > > i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter ; > I face following error >> > > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to > org.apache.avro.generic.IndexedRecord > > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) > > at org.apache.avro.generic.GenericData.getField(GenericData.java:595) > > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) > > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) > > > 2) Next, if I use TypeInformationSerializationSchema (irrespective of > AvroCoder in Pipeline) , things apparently work fine > > .. as Kafka test consumer tool prints the message >> > java.lang.String{"uname": "Joe", "id": 6} > > > B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest > >>> I understand we should either use TypeInformationSerializationSchema in >>> both consumer and producer OR > > should use AvroDeserializationSchema and AvroSerializationSchema in Consumer > and Producer respectively !! > > But, irrespective of using AvroDeserializationSchema or > TypeInformationSerializationSchema, I get the following exception >> > > Exception in thread "main" java.lang.NullPointerException: null value in > entry: V=null > > at > com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33) > > at > com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39) > > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49) > > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70) > > at > org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221) > > > Thanks > > Kaniska |
Free forum by Nabble | Edit this page |