Hi,
Avro schema contains Array<String> type and we created TableSchema out of the AvroSchema and created a table in catalog. In the catalog, this specific filed type shown as ARRAY<VARCHAR(2147483647)>. We are using AvroRowDeserializationSchema with the connector and returnType of TableSource showing Array<String> mapped to LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, by AvroSchemaConverter when we are running the application, planner validating physical types and logical types and we are getting below error. of table field 'XYZ' does not match with the physical type ROW< Any suggestions on how to resolve this ? is this a bug ? |
Hi Ramana,
What connector do you use or how do you instantiate the TableSource? Also which catalog do you use and how do you register your table in that catalog? The problem is that conversion from TypeInformation to DataType produces legacy types (because they cannot be mapped exactyl 1-1 to the new types). If you can change the code of the TableSource you can return in the TableSource#getProducedType the tableSchema.toRowDataType, where the tableSchema is the schema coming from catalog. Or you can make sure that the catalog table produces the legacy type: TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING)); In 1.11 we will introduce new sources and formats already working entirely with the new type system (AvroRowDataDeserializationSchema and KafkaDynamicTable). Hope this helps a bit. Best, Dawid On 04/06/2020 13:43, Ramana Uppala wrote: > Hi, > Avro schema contains Array<String> type and we created TableSchema out of the AvroSchema and created a table in catalog. In the catalog, this specific filed type shown as ARRAY<VARCHAR(2147483647)>. We are using AvroRowDeserializationSchema with the connector and returnType of TableSource showing Array<String> mapped to LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, by AvroSchemaConverter > > when we are running the application, planner validating physical types and logical types and we are getting below error. > > of table field 'XYZ' does not match with the physical type ROW< > > Any suggestions on how to resolve this ? is this a bug ? signature.asc (849 bytes) Download Attachment |
Hi Dawid, We are using a custom connector that is very similar to Flink Kafka Connector and instantiating TableSchema using a custom class which maps Avro types to Flink's DataTypes using TableSchema.Builder. For Array type, we have below mapping: case ARRAY: return DataTypes.ARRAY(toFlinkType(schema.getElementType())); We are using Hive Catalog and creating tables using CatalogTableImpl with TableSchema. As you mentioned, if we create TableSchema with legacy types, our connectors works without any issues. But, we want to use the new Flink DataTypes API but having issues. Also, one more observation is if we use legacy types in TableSource creation, application not working using Blink Planner. We are getting the same error physical type not matching. Looking forward to the 1.11 changes. On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz <[hidden email]> wrote: Hi Ramana, |
To make sure we are on the same page. The end goal is to have the CatalogTable#getTableSchema/TableSource#getTableSchema return a schema that is compatible with TableSource#getProducedDataType. If you want to use the new types, you should not implement the
TableSource#getReturnType. Moreover you should also not use any
Flink utilities that convert from TypeInformation to DataTypes as
those produce legacy types. I am aware there is a lot of corner cases and we worked hard to improve the situation with the new sources and sinks interfaces. Below I add an example how you could pass different array types:
Hope this will help and that it will be much easier in Flink 1.11 Best, Dawid On 05/06/2020 13:33, Ramana Uppala
wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |