I might be misunderstanding Flink Avro support. I assumed not including a field in "CREATE TABLE" would work fine. If I leave out any field before a nested row, "CREATE TABLE" fails. If I include all of the fields, this succeeds. I assumed fields would be optional. I'm using Flink v1.11.1 with the Table SQL API. Problem If I do not include one of the fields, I get the following exception. If I add back the missing field, "contentId", this works. "CREATE TABLE `default.mydb.mytable` (\n" + Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.IndexedRecord at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) |
Hi Dan, I'd say this is a result of a few assumptions.
However I think it is a valid scenario to support such a case for the combination of avro + filesystem. Honestly we do that already for avro-schema-registry format (where we look up the writer schema in SR and convert to the schema of DDL). Moreover it should be relatively easy to do that. @Jingsong What do you think? Best, Dawid
On 16/09/2020 09:49, Dan Hill wrote:
signature.asc (849 bytes) Download Attachment |
Interesting. How does schema evolution work with Avro and Flink? E.g. adding new fields or enum values. On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <[hidden email]> wrote:
|
Hi Dan, It depends which part of the system you have in mind. Generally though Avro itself does need the original schema of the record it was written with. There are a couple of alternatives. You have RegistryAvroDeserializationSchema for DataStream, which looks up the old schema in schema registry (It will be exposed in SQL in 1.12[1]). If we are talking about schema migration for state objects, we persist the schema with which the state was written. So that upon restore we have the old schema and possibly new schema. In case of avro files we could/should probably use the schema from the file. Best, Dawid
[1] https://issues.apache.org/jira/browse/FLINK-16048 On 16/09/2020 21:20, Dan Hill wrote:
signature.asc (849 bytes) Download Attachment |
Thanks Dawid! You answered most of my questions: 1) Kafka to Flink - Is the most common practice to use the Confluent Schema Registry and then use ConfluentRegistryAvroDeserializationSchema? 2) Flink to State - great. 3) Flink to Avro file output - great. 4) Avro file output to Flink (batch) - Yes, I assumed this convert from the schema in the file to the SQL schema. I'm sorry if this was a basic question. The previous systems I've designed use Protobufs. Evolution was a lot easier (if tags are backwards compatible). On Thu, Sep 17, 2020 at 3:33 AM Dawid Wysakowicz <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |