Error: Avro "CREATE TABLE" with nested rows and missing fields

classic Classic list List threaded Threaded
5 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Error: Avro "CREATE TABLE" with nested rows and missing fields

Dan
I might be misunderstanding Flink Avro support.  I assumed not including a field in "CREATE TABLE" would work fine.  If I leave out any field before a nested row, "CREATE TABLE" fails.  If I include all of the fields, this succeeds.  I assumed fields would be optional.

I'm using Flink v1.11.1 with the Table SQL API.

Problem
If I do not include one of the fields, I get the following exception.  If I add back the missing field, "contentId", this works.
"CREATE TABLE `default.mydb.mytable` (\n" +
"`userId` STRING, \n" +
"`timeEpochMillis` BIGINT, \n" +
//"`contentId` BIGINT, \n" +
"`contentDetails` ROW<\n" +
"`contentId` BIGINT >\n" +
") WITH (...)\n"

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
    at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

Reply | Threaded
Open this post in threaded view
|

Re: Error: Avro "CREATE TABLE" with nested rows and missing fields

Dawid Wysakowicz-2

Hi Dan,

I'd say this is a result of a few assumptions.

  1. We try to separate the concept of format from the connector. Therefore we did not make too many assumption which connector does a format work with.
  2. Avro needs the original schema that the incoming record was serialized with. It will not work with just an expected schema. Even if it is "compatible" with the old one.
  3. The most common use case for Avro we see is that it is used for stream processing (e.g. Avro encoded messages in Kafka). In that scenario we do not have the schema encoded alongside the data. Therefore we assume the DDL is the only source of truth for the schema of the data (that is also true for other formats such as e.g. JSON or CSV). I agree in case of Avro files we have the original schema encoded in files. It would contradict with the assumption that DDL is the original schema. 

However I think it is a valid scenario to support such a case for the combination of avro + filesystem. Honestly we do that already for avro-schema-registry format (where we look up the writer schema in SR and convert to the schema of DDL). Moreover it should be relatively easy to do that. @Jingsong What do you think?

Best,

Dawid


On 16/09/2020 09:49, Dan Hill wrote:
I might be misunderstanding Flink Avro support.  I assumed not including a field in "CREATE TABLE" would work fine.  If I leave out any field before a nested row, "CREATE TABLE" fails.  If I include all of the fields, this succeeds.  I assumed fields would be optional.

I'm using Flink v1.11.1 with the Table SQL API.

Problem
If I do not include one of the fields, I get the following exception.  If I add back the missing field, "contentId", this works.
"CREATE TABLE `default.mydb.mytable` (\n" +
        "`userId` STRING, \n" +
        "`timeEpochMillis` BIGINT, \n" +
        //"`contentId` BIGINT, \n" +
        "`contentDetails` ROW<\n" +
            "`contentId` BIGINT >\n" +
        ") WITH (...)\n"

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
    at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

signature.asc (849 bytes) Download Attachment
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Error: Avro "CREATE TABLE" with nested rows and missing fields

Dan
Interesting.  How does schema evolution work with Avro and Flink?  E.g. adding new fields or enum values.

On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

I'd say this is a result of a few assumptions.

  1. We try to separate the concept of format from the connector. Therefore we did not make too many assumption which connector does a format work with.
  2. Avro needs the original schema that the incoming record was serialized with. It will not work with just an expected schema. Even if it is "compatible" with the old one.
  3. The most common use case for Avro we see is that it is used for stream processing (e.g. Avro encoded messages in Kafka). In that scenario we do not have the schema encoded alongside the data. Therefore we assume the DDL is the only source of truth for the schema of the data (that is also true for other formats such as e.g. JSON or CSV). I agree in case of Avro files we have the original schema encoded in files. It would contradict with the assumption that DDL is the original schema. 

However I think it is a valid scenario to support such a case for the combination of avro + filesystem. Honestly we do that already for avro-schema-registry format (where we look up the writer schema in SR and convert to the schema of DDL). Moreover it should be relatively easy to do that. @Jingsong What do you think?

Best,

Dawid


On 16/09/2020 09:49, Dan Hill wrote:
I might be misunderstanding Flink Avro support.  I assumed not including a field in "CREATE TABLE" would work fine.  If I leave out any field before a nested row, "CREATE TABLE" fails.  If I include all of the fields, this succeeds.  I assumed fields would be optional.

I'm using Flink v1.11.1 with the Table SQL API.

Problem
If I do not include one of the fields, I get the following exception.  If I add back the missing field, "contentId", this works.
"CREATE TABLE `default.mydb.mytable` (\n" +
        "`userId` STRING, \n" +
        "`timeEpochMillis` BIGINT, \n" +
        //"`contentId` BIGINT, \n" +
        "`contentDetails` ROW<\n" +
            "`contentId` BIGINT >\n" +
        ") WITH (...)\n"

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
    at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Reply | Threaded
Open this post in threaded view
|

Re: Error: Avro "CREATE TABLE" with nested rows and missing fields

Dawid Wysakowicz-2

Hi Dan,

It depends which part of the system you have in mind. Generally though Avro itself does need the original schema of the record it was written with. There are a couple of alternatives. You have RegistryAvroDeserializationSchema for DataStream, which looks up the old schema in schema registry (It will be exposed in SQL in 1.12[1]). If we are talking about schema migration for state objects, we persist the schema with which the state was written. So that upon restore we have the old schema and possibly new schema.

In case of avro files we could/should probably use the schema from the file.

Best,

Dawid


[1] https://issues.apache.org/jira/browse/FLINK-16048

On 16/09/2020 21:20, Dan Hill wrote:
Interesting.  How does schema evolution work with Avro and Flink?  E.g. adding new fields or enum values.

On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

I'd say this is a result of a few assumptions.

  1. We try to separate the concept of format from the connector. Therefore we did not make too many assumption which connector does a format work with.
  2. Avro needs the original schema that the incoming record was serialized with. It will not work with just an expected schema. Even if it is "compatible" with the old one.
  3. The most common use case for Avro we see is that it is used for stream processing (e.g. Avro encoded messages in Kafka). In that scenario we do not have the schema encoded alongside the data. Therefore we assume the DDL is the only source of truth for the schema of the data (that is also true for other formats such as e.g. JSON or CSV). I agree in case of Avro files we have the original schema encoded in files. It would contradict with the assumption that DDL is the original schema. 

However I think it is a valid scenario to support such a case for the combination of avro + filesystem. Honestly we do that already for avro-schema-registry format (where we look up the writer schema in SR and convert to the schema of DDL). Moreover it should be relatively easy to do that. @Jingsong What do you think?

Best,

Dawid


On 16/09/2020 09:49, Dan Hill wrote:
I might be misunderstanding Flink Avro support.  I assumed not including a field in "CREATE TABLE" would work fine.  If I leave out any field before a nested row, "CREATE TABLE" fails.  If I include all of the fields, this succeeds.  I assumed fields would be optional.

I'm using Flink v1.11.1 with the Table SQL API.

Problem
If I do not include one of the fields, I get the following exception.  If I add back the missing field, "contentId", this works.
"CREATE TABLE `default.mydb.mytable` (\n" +
        "`userId` STRING, \n" +
        "`timeEpochMillis` BIGINT, \n" +
        //"`contentId` BIGINT, \n" +
        "`contentDetails` ROW<\n" +
            "`contentId` BIGINT >\n" +
        ") WITH (...)\n"

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
    at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

signature.asc (849 bytes) Download Attachment
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Error: Avro "CREATE TABLE" with nested rows and missing fields

Dan
Thanks Dawid!

You answered most of my questions:
1) Kafka to Flink - Is the most common practice to use the Confluent Schema Registry and then use ConfluentRegistryAvroDeserializationSchema?
2) Flink to State - great.
3) Flink to Avro file output - great.
4) Avro file output to Flink (batch) - Yes, I assumed this convert from the schema in the file to the SQL schema.

I'm sorry if this was a basic question.  The previous systems I've designed use Protobufs.  Evolution was a lot easier (if tags are backwards compatible).



On Thu, Sep 17, 2020 at 3:33 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

It depends which part of the system you have in mind. Generally though Avro itself does need the original schema of the record it was written with. There are a couple of alternatives. You have RegistryAvroDeserializationSchema for DataStream, which looks up the old schema in schema registry (It will be exposed in SQL in 1.12[1]). If we are talking about schema migration for state objects, we persist the schema with which the state was written. So that upon restore we have the old schema and possibly new schema.

In case of avro files we could/should probably use the schema from the file.

Best,

Dawid


[1] https://issues.apache.org/jira/browse/FLINK-16048

On 16/09/2020 21:20, Dan Hill wrote:
Interesting.  How does schema evolution work with Avro and Flink?  E.g. adding new fields or enum values.

On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

I'd say this is a result of a few assumptions.

  1. We try to separate the concept of format from the connector. Therefore we did not make too many assumption which connector does a format work with.
  2. Avro needs the original schema that the incoming record was serialized with. It will not work with just an expected schema. Even if it is "compatible" with the old one.
  3. The most common use case for Avro we see is that it is used for stream processing (e.g. Avro encoded messages in Kafka). In that scenario we do not have the schema encoded alongside the data. Therefore we assume the DDL is the only source of truth for the schema of the data (that is also true for other formats such as e.g. JSON or CSV). I agree in case of Avro files we have the original schema encoded in files. It would contradict with the assumption that DDL is the original schema. 

However I think it is a valid scenario to support such a case for the combination of avro + filesystem. Honestly we do that already for avro-schema-registry format (where we look up the writer schema in SR and convert to the schema of DDL). Moreover it should be relatively easy to do that. @Jingsong What do you think?

Best,

Dawid


On 16/09/2020 09:49, Dan Hill wrote:
I might be misunderstanding Flink Avro support.  I assumed not including a field in "CREATE TABLE" would work fine.  If I leave out any field before a nested row, "CREATE TABLE" fails.  If I include all of the fields, this succeeds.  I assumed fields would be optional.

I'm using Flink v1.11.1 with the Table SQL API.

Problem
If I do not include one of the fields, I get the following exception.  If I add back the missing field, "contentId", this works.
"CREATE TABLE `default.mydb.mytable` (\n" +
        "`userId` STRING, \n" +
        "`timeEpochMillis` BIGINT, \n" +
        //"`contentId` BIGINT, \n" +
        "`contentDetails` ROW<\n" +
            "`contentId` BIGINT >\n" +
        ") WITH (...)\n"

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
    at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)