Avro Arrat<String> type validation error

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Avro Arrat<String> type validation error

Ramana Uppala-2
Hi,
Avro schema contains Array<String> type and we created TableSchema out of the AvroSchema and created a table in catalog. In the catalog, this specific filed type shown as ARRAY<VARCHAR(2147483647)>. We are using AvroRowDeserializationSchema with the connector and returnType of TableSource showing Array<String> mapped to LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, by AvroSchemaConverter

when we are running the application, planner validating physical types and logical types and we are getting below error.

of table field 'XYZ' does not match with the physical type ROW<

Any suggestions on how to resolve this ? is this a bug ?
Reply | Threaded
Open this post in threaded view
|

Re: Avro Arrat<String> type validation error

Dawid Wysakowicz-2
Hi Ramana,

What connector do you use or how do you instantiate the TableSource?
Also which catalog do you use and how do you register your table in that
catalog?

The problem is that conversion from TypeInformation to DataType produces
legacy types (because they cannot be mapped exactyl 1-1 to the new types).

If you can change the code of the TableSource you can return in the
TableSource#getProducedType the tableSchema.toRowDataType, where the
tableSchema is the schema coming from catalog. Or you can make sure that
the catalog table produces the legacy type:

TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));

In 1.11 we will introduce new sources and formats already working
entirely with the new type system (AvroRowDataDeserializationSchema and
KafkaDynamicTable).

Hope this helps a bit.

Best,

Dawid

On 04/06/2020 13:43, Ramana Uppala wrote:
> Hi,
> Avro schema contains Array<String> type and we created TableSchema out of the AvroSchema and created a table in catalog. In the catalog, this specific filed type shown as ARRAY<VARCHAR(2147483647)>. We are using AvroRowDeserializationSchema with the connector and returnType of TableSource showing Array<String> mapped to LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, by AvroSchemaConverter
>
> when we are running the application, planner validating physical types and logical types and we are getting below error.
>
> of table field 'XYZ' does not match with the physical type ROW<
>
> Any suggestions on how to resolve this ? is this a bug ?


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] Re: Avro Arrat<String> type validation error

Ramana Uppala-2
Hi Dawid,

We are using a custom connector that is very similar to Flink Kafka Connector and  instantiating TableSchema using a custom class which maps Avro types to Flink's DataTypes using TableSchema.Builder.

For Array type, we have below mapping:

 case ARRAY:
                return DataTypes.ARRAY(toFlinkType(schema.getElementType()));


We are using Hive Catalog and creating tables using CatalogTableImpl with TableSchema.

As you mentioned, if we create TableSchema with legacy types, our connectors works without any issues. But, we want to use the new Flink DataTypes API but having issues.

Also, one more observation is if we use legacy types in TableSource creation, application not working using Blink Planner. We are getting the same error physical type not matching.

Looking forward to the 1.11 changes.


On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi Ramana,

What connector do you use or how do you instantiate the TableSource?
Also which catalog do you use and how do you register your table in that
catalog?

The problem is that conversion from TypeInformation to DataType produces
legacy types (because they cannot be mapped exactyl 1-1 to the new types).

If you can change the code of the TableSource you can return in the
TableSource#getProducedType the tableSchema.toRowDataType, where the
tableSchema is the schema coming from catalog. Or you can make sure that
the catalog table produces the legacy type:

TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));

In 1.11 we will introduce new sources and formats already working
entirely with the new type system (AvroRowDataDeserializationSchema and
KafkaDynamicTable).

Hope this helps a bit.

Best,

Dawid

On 04/06/2020 13:43, Ramana Uppala wrote:
> Hi,
> Avro schema contains Array<String> type and we created TableSchema out of the AvroSchema and created a table in catalog. In the catalog, this specific filed type shown as ARRAY<VARCHAR(2147483647)>. We are using AvroRowDeserializationSchema with the connector and returnType of TableSource showing Array<String> mapped to LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, by AvroSchemaConverter
>
> when we are running the application, planner validating physical types and logical types and we are getting below error.
>
> of table field 'XYZ' does not match with the physical type ROW<
>
> Any suggestions on how to resolve this ? is this a bug ?




The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] Re: Avro Arrat<String> type validation error

Dawid Wysakowicz-2

To make sure we are on the same page.

The end goal is to have the

CatalogTable#getTableSchema/TableSource#getTableSchema return a schema that is compatible with TableSource#getProducedDataType.

If you want to use the new types, you should not implement the TableSource#getReturnType. Moreover you should also not use any Flink utilities that convert from TypeInformation to DataTypes as those produce legacy types.

I am aware there is a lot of corner cases and we worked hard to improve the situation with the new sources and sinks interfaces.

Below I add an example how you could pass different array types:

StreamExecutionEnvironment exec = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);
tEnv.registerTableSource(
    "T",
    new StreamTableSource<Row>() {
        @Override
        public TableSchema getTableSchema() {
            return TableSchema.builder()
                .field("f0", DataTypes.ARRAY(DataTypes.BIGINT().notNull()))
                .field("f1", DataTypes.ARRAY(DataTypes.BIGINT()))
                .field("f2", DataTypes.ARRAY(DataTypes.STRING()))
                .build();
        }

        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
            return execEnv.fromCollection(
                Arrays.asList(Row.of(new long[]{1}, new Long[]{new Long(1)}, new String[]{"ABCDE"})),
                // this is necessary for STRING array, cause otherwise DataStream produces a different
                // TypeInformation than the planner expects
                (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())
            );
        }


        @Override
        public DataType getProducedDataType() {
            return DataTypes.ROW(
                DataTypes.FIELD(
                    "f0",
                    DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class))
                        .bridgedTo(long[].class)),
                DataTypes.FIELD(
                    "f1",
                    DataTypes.ARRAY(DataTypes.BIGINT())),
                DataTypes.FIELD(
                    "f2",
                    DataTypes.ARRAY(DataTypes.STRING()))
                );
        }
    });

Table table = tEnv.sqlQuery("SELECT f0, f1, f2 FROM T");
DataStream<Row> result = tEnv.toAppendStream(
    table,
    Types.ROW(
        Types.PRIMITIVE_ARRAY(Types.LONG),
        ObjectArrayTypeInfo.getInfoFor(Types.LONG),
        ObjectArrayTypeInfo.getInfoFor(Types.STRING)));
result.print();
env.execute();

Hope this will help and that it will be much easier in Flink 1.11

Best,

Dawid

On 05/06/2020 13:33, Ramana Uppala wrote:
Hi Dawid,

We are using a custom connector that is very similar to Flink Kafka Connector and  instantiating TableSchema using a custom class which maps Avro types to Flink's DataTypes using TableSchema.Builder.

For Array type, we have below mapping:

 case ARRAY:
                return DataTypes.ARRAY(toFlinkType(schema.getElementType()));


We are using Hive Catalog and creating tables using CatalogTableImpl with TableSchema.

As you mentioned, if we create TableSchema with legacy types, our connectors works without any issues. But, we want to use the new Flink DataTypes API but having issues.

Also, one more observation is if we use legacy types in TableSource creation, application not working using Blink Planner. We are getting the same error physical type not matching.

Looking forward to the 1.11 changes.


On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi Ramana,

What connector do you use or how do you instantiate the TableSource?
Also which catalog do you use and how do you register your table in that
catalog?

The problem is that conversion from TypeInformation to DataType produces
legacy types (because they cannot be mapped exactyl 1-1 to the new types).

If you can change the code of the TableSource you can return in the
TableSource#getProducedType the tableSchema.toRowDataType, where the
tableSchema is the schema coming from catalog. Or you can make sure that
the catalog table produces the legacy type:

TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));

In 1.11 we will introduce new sources and formats already working
entirely with the new type system (AvroRowDataDeserializationSchema and
KafkaDynamicTable).

Hope this helps a bit.

Best,

Dawid

On 04/06/2020 13:43, Ramana Uppala wrote:
> Hi,
> Avro schema contains Array<String> type and we created TableSchema out of the AvroSchema and created a table in catalog. In the catalog, this specific filed type shown as ARRAY<VARCHAR(2147483647)>. We are using AvroRowDeserializationSchema with the connector and returnType of TableSource showing Array<String> mapped to LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, by AvroSchemaConverter
>
> when we are running the application, planner validating physical types and logical types and we are getting below error.
>
> of table field 'XYZ' does not match with the physical type ROW<
>
> Any suggestions on how to resolve this ? is this a bug ?




The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.



signature.asc (849 bytes) Download Attachment