We are using AvroRowDeserializationSchema with Kafka Table source to deserialize the messages. Application failed with "Failed to deserialize Avro record." for different messages it seems.
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -26 Caused by: java.lang.ArrayIndexOutOfBoundsException: -49 at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) ~[avro-1.8.2.jar:1.8.2] We are not sure what the serialization mechanism producer is using to publish the messages at this time. But above errors are related to https://issues.apache.org/jira/browse/FLINK-16048 ? Any suggestions on fixing above issues ? we are using Flink 1.10 |
It's rather hard to help if we don't know the format in which the
records are serialized. There is a significant difference if you use a schema registry or not. All schema registries known to me prepend the actual data with some kind of magic byte and an identifier of the schema. Therefore if we do not know to expect that we cannot properly deserialize the record. Nevertheless I would not say the problem has something to do with schema registry. If I understand you correctly some records can be deserialized. If they were produced with the schema registry type of serialization all would fail. What I can recommend is to try to log/identify a record that cannot be deserialized and check debug the AvroRowDeserializationSchema with it. Best, Dawid On 06/06/2020 16:27, Ramana Uppala wrote: > We are using AvroRowDeserializationSchema with Kafka Table source to deserialize the messages. Application failed with "Failed to deserialize Avro record." for different messages it seems. > > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -26 > > Caused by: java.lang.ArrayIndexOutOfBoundsException: -49 > at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) ~[avro-1.8.2.jar:1.8.2] > > We are not sure what the serialization mechanism producer is using to publish the messages at this time. But above errors are related to https://issues.apache.org/jira/browse/FLINK-16048 ? > > Any suggestions on fixing above issues ? we are using Flink 1.10 signature.asc (849 bytes) Download Attachment |
If data is coming from Kafka, the write schema is most likely stored in a Schema Registry. If so, you absolutely need to use ConfluentRegistryAvroSerializationSchema of the flink-avro-confluent-registry package. If you didn't opt for that most common architecture pattern, then you often run into that the write schema and the supplied schema do not match. That could also be the case here (but try the other serialization schema first). If it still prevails, please elaborate how you manage the schema. It's also helpful to see an example record and the schema if possible. On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz <[hidden email]> wrote: It's rather hard to help if we don't know the format in which the -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Arvid / Dawid,
Yes we did small POC with custom Avro Row Deserializer which uses ConfluentRegistryAvroDeSerializationSchema and we are able to parse the message. We have Schema registry and users are given choice to produce with different serialization mechanisms. Some messages we are able to parse with "AvroRowDeserializationSchema" some we couldn't. Our understanding is that failed messages topics are produced with confluent serialization. Is there any uniform AvroRowDeserialization that works with all scenarios ? On 2020/06/09 11:03:23, Arvid Heise <[hidden email]> wrote: > If data is coming from Kafka, the write schema is most likely stored in a > Schema Registry. If so, you absolutely need to use > ConfluentRegistryAvroSerializationSchema of the > *flink-avro-confluent-registry* package. > > If you didn't opt for that most common architecture pattern, then you often > run into that the write schema and the supplied schema do not match. That > could also be the case here (but try the other serialization schema first). > If it still prevails, please elaborate how you manage the schema. It's also > helpful to see an example record and the schema if possible. > > On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz <[hidden email]> > wrote: > > > It's rather hard to help if we don't know the format in which the > > records are serialized. There is a significant difference if you use a > > schema registry or not. All schema registries known to me prepend the > > actual data with some kind of magic byte and an identifier of the > > schema. Therefore if we do not know to expect that we cannot properly > > deserialize the record. > > > > Nevertheless I would not say the problem has something to do with schema > > registry. If I understand you correctly some records can be > > deserialized. If they were produced with the schema registry type of > > serialization all would fail. > > > > What I can recommend is to try to log/identify a record that cannot be > > deserialized and check debug the AvroRowDeserializationSchema with it. > > > > Best, > > > > Dawid > > > > On 06/06/2020 16:27, Ramana Uppala wrote: > > > We are using AvroRowDeserializationSchema with Kafka Table source to > > deserialize the messages. Application failed with "Failed to deserialize > > Avro record." for different messages it seems. > > > > > > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length > > is negative: -26 > > > > > > Caused by: java.lang.ArrayIndexOutOfBoundsException: -49 > > > at > > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) > > ~[avro-1.8.2.jar:1.8.2] > > > > > > We are not sure what the serialization mechanism producer is using to > > publish the messages at this time. But above errors are related to > > https://issues.apache.org/jira/browse/FLINK-16048 ? > > > > > > Any suggestions on fixing above issues ? we are using Flink 1.10 > > > > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > |
Good to hear.
There is no schema that would support all ways. I would also rather discourage such approach, as it makes it really hard to make changes to the records schema. I would strongly recommend using schema registry for all records. If you still want to have a schema that would work for both you could implement one based on both ConfluentRegistryAvroDeSerializationSchema and AvroRowDeserialization which would check for the magic byte. If the magic byte is present deserialize with ConfluentRegistryAvroDeSerializationSchema. If it does not with the AvroRowDeserialization. But again I'd rather discourage such approach. Best, Dawid On 09/06/2020 14:21, Ramana Uppala wrote: > Hi Arvid / Dawid, > > Yes we did small POC with custom Avro Row Deserializer which uses ConfluentRegistryAvroDeSerializationSchema and we are able to parse the message. > > We have Schema registry and users are given choice to produce with different serialization mechanisms. Some messages we are able to parse with "AvroRowDeserializationSchema" some we couldn't. Our understanding is that failed messages topics are produced with confluent serialization. > > Is there any uniform AvroRowDeserialization that works with all scenarios ? > > On 2020/06/09 11:03:23, Arvid Heise <[hidden email]> wrote: >> If data is coming from Kafka, the write schema is most likely stored in a >> Schema Registry. If so, you absolutely need to use >> ConfluentRegistryAvroSerializationSchema of the >> *flink-avro-confluent-registry* package. >> >> If you didn't opt for that most common architecture pattern, then you often >> run into that the write schema and the supplied schema do not match. That >> could also be the case here (but try the other serialization schema first). >> If it still prevails, please elaborate how you manage the schema. It's also >> helpful to see an example record and the schema if possible. >> >> On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz <[hidden email]> >> wrote: >> >>> It's rather hard to help if we don't know the format in which the >>> records are serialized. There is a significant difference if you use a >>> schema registry or not. All schema registries known to me prepend the >>> actual data with some kind of magic byte and an identifier of the >>> schema. Therefore if we do not know to expect that we cannot properly >>> deserialize the record. >>> >>> Nevertheless I would not say the problem has something to do with schema >>> registry. If I understand you correctly some records can be >>> deserialized. If they were produced with the schema registry type of >>> serialization all would fail. >>> >>> What I can recommend is to try to log/identify a record that cannot be >>> deserialized and check debug the AvroRowDeserializationSchema with it. >>> >>> Best, >>> >>> Dawid >>> >>> On 06/06/2020 16:27, Ramana Uppala wrote: >>>> We are using AvroRowDeserializationSchema with Kafka Table source to >>> deserialize the messages. Application failed with "Failed to deserialize >>> Avro record." for different messages it seems. >>>> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length >>> is negative: -26 >>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -49 >>>> at >>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) >>> ~[avro-1.8.2.jar:1.8.2] >>>> We are not sure what the serialization mechanism producer is using to >>> publish the messages at this time. But above errors are related to >>> https://issues.apache.org/jira/browse/FLINK-16048 ? >>>> Any suggestions on fixing above issues ? we are using Flink 1.10 >>> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |