This post was updated on .
Hello Team,
As we have two kafka connectors "upsert-kafka" and "kafka". I am facing issue with "upsert-kafka" while reading avro message serialized using "io.confluent.kafka.serializers.KafkaAvroDeserializer". Please note "kafka" connector is working while reading avro message serialized using "io.confluent.kafka.serializers.KafkaAvroDeserializer". Below are the definitions with both the Kafka connector:- *Table definition with "kafka"connector which is working fine.* /CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', '' = 'earliest', 'topic' = 'lndcdcadsprpslproposalline', 'format'='avro-confluent','avro-confluent.schema-registry.url' = ' <a href="http://localhost:8081'">http://localhost:8081', 'avro-confluent.schema-registry.subject' = 'lndcdcadsprpslproposalline-value') <<a href="http://localhost:8081'">http://localhost:8081'> / *Table definition and error with "upsert-kafka"connector which is not working fine.* / CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'localhost:9092', '' = 'earliest', 'topic' = 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' = 'avro', ''='dd', 'properties.schema.registry.url'=' <a href="http://localhost:8081'">http://localhost:8081', 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer', 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer') ERROR: Caused by: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize( at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize( at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize( at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize( at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler( at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop( at at at at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$ Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at$Alternative.getSymbol( at at org.apache.avro.generic.GenericDatumReader.readWithoutConversion( at at org.apache.avro.generic.GenericDatumReader.readField( at org.apache.avro.generic.GenericDatumReader.readRecord( at org.apache.avro.generic.GenericDatumReader.readWithoutConversion( at at at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize( at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize( ... 9 more <<a href="http://localhost:8081'">http://localhost:8081'> / Please help. Regards, Shamit -- Sent from: |
Hello Flink Users,
Request you to please help. I am facing issue with "KafkaAvroDeserializer" by using "upsert-kafka" connector. Regards, Shamit Jain -- Sent from: |
Hi Shamit, Why are specifying the upsert-kafka completely different? In particular, why did you set the serializer explicitly? I would have assumed that just setting the format to 'format'='avro-confluent' should be enough (same as in the working source). On Tue, Feb 9, 2021 at 11:06 PM Shamit <[hidden email]> wrote: Hello Flink Users, |
This post was updated on .
Hi Arvid,
Thanks for the response. I have tried without serializer and getting error. With "avro-confluent" it shows missing "schema-registry.url" although it is defined in the definition. Below is the screen shot. Request you to please help. CREATE TABLE test > (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String , FLIGHT_ID BIGINT,proctime as PROCTIME(), rowtime TIMESTAMP(3) METADATA FROM 'timestamp', > update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, PRIMARY KEY(PROPOSAL_LINE_ID) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'properties.bootstrap.servers' = 'localhost:9092', > '' = 'earliest', > 'topic' = 'lndcdcadsprpslproposalline', > 'key.format' = 'avro-confluent', > 'value.format' = 'avro-confluent', > 'schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081' > ); [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: One or more required options are missing. Missing required options are: schema-registry.url Regards, Shamit Jain -- Sent from: |
In reply to this post by Arvid Heise-4
Hi Arvid,
Thanks for the response. I have tried without serializer and getting error. With "avro-confluent" it shows missing "schema-registry.url" although it is defined in the definition. Below is the screen shot. Request you to please help. <> Regards, Shamit Jain -- Sent from: |
Free forum by Nabble | Edit this page |