Flink 1.11 test Parquet sink

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 test Parquet sink

Flavio Pompermaier
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 test Parquet sink

Flavio Pompermaier
Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 test Parquet sink

Flavio Pompermaier
If I use 

final Table inputTable = tableEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
            DataTypes.FIELD("col2", DataTypes.STRING().notNull())
        ), ..
  tableEnv.executeSql(//
        "CREATE TABLE `out` (" +
            "col1 STRING," + 
            "col2 STRING" + 
            ") WITH (...)

the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct?

On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 test Parquet sink

Leonard Xu
Hi, Flavio

I reproduced your issue, and I think it should be a bug. But I’m not sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded calcite. 

Maybe Danny can help explain more.

CC: Danny

Best
Leonard Xu

在 2020年7月14日,23:06,Flavio Pompermaier <[hidden email]> 写道:

If I use 

final Table inputTable = tableEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
            DataTypes.FIELD("col2", DataTypes.STRING().notNull())
        ), ..
  tableEnv.executeSql(//
        "CREATE TABLE `out` (" +
            "col1 STRING," + 
            "col2 STRING" + 
            ") WITH (...)

the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct?

On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 test Parquet sink

Jark Wu-3
I think this might be a bug in `tableEnv.fromValues`.

Could you try to remove the DataType parameter, and let the framework derive the types?

final Table inputTable = tableEnv.fromValues(
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));

Best,
Jark


On Wed, 15 Jul 2020 at 11:19, Leonard Xu <[hidden email]> wrote:
Hi, Flavio

I reproduced your issue, and I think it should be a bug. But I’m not sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded calcite. 

Maybe Danny can help explain more.

CC: Danny

Best
Leonard Xu

在 2020年7月14日,23:06,Flavio Pompermaier <[hidden email]> 写道:

If I use 

final Table inputTable = tableEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
            DataTypes.FIELD("col2", DataTypes.STRING().notNull())
        ), ..
  tableEnv.executeSql(//
        "CREATE TABLE `out` (" +
            "col1 STRING," + 
            "col2 STRING" + 
            ") WITH (...)

the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct?

On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 test Parquet sink

Flavio Pompermaier
If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1 FROM ParquetDataset".
If there is still a bug fill a proper JIRA ticket with the exact description of the problem..

Just to conclude this thread there are 2 strange things I found:

1) Is LONG really not supported yet? If I use as output table LONG,STRING I get
      Exception in thread "main" java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: LONG
      at org.apache.calcite.util.Util.needToImplement(Util.java:967)

2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?

Best,
Flavio


On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <[hidden email]> wrote:
I think this might be a bug in `tableEnv.fromValues`.

Could you try to remove the DataType parameter, and let the framework derive the types?

final Table inputTable = tableEnv.fromValues(
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));

Best,
Jark


On Wed, 15 Jul 2020 at 11:19, Leonard Xu <[hidden email]> wrote:
Hi, Flavio

I reproduced your issue, and I think it should be a bug. But I’m not sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded calcite. 

Maybe Danny can help explain more.

CC: Danny

Best
Leonard Xu

在 2020年7月14日,23:06,Flavio Pompermaier <[hidden email]> 写道:

If I use 

final Table inputTable = tableEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
            DataTypes.FIELD("col2", DataTypes.STRING().notNull())
        ), ..
  tableEnv.executeSql(//
        "CREATE TABLE `out` (" +
            "col1 STRING," + 
            "col2 STRING" + 
            ") WITH (...)

the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct?

On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 test Parquet sink

Dawid Wysakowicz-2

Hi,

Unfortunately this is a bug.

The problem is in CustomizedConvertRule#convertCast as it drops the requested nullability. It was fixed in master as part of FLINK-13784[1]. Therefore the example works on master.

Could you create a jira issue for 1.11 version? We could backport the corresponding part of FLINK-13784. As a workaround you can try using the values without registering it in the catalog, as the registration triggers the type check. (I know this is not perfect):

    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), ...);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = '<a class="moz-txt-link-freetext" href="file://">file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    inputTable.executeInsert(`out`);

As for the types SQL does not have LONG nor STRING types. Java's long is equivalent to SQL's BIGINT. STRING is only an alias for VARCHAR(Long.MAX_VALUE), which was added for improved usability so that you do not need to type the max long manually. For complete list of supported types see the docs[2]


[1] https://issues.apache.org/jira/browse/FLINK-13784

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html

Best,

Dawid

On 15/07/2020 09:40, Flavio Pompermaier wrote:
If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1 FROM ParquetDataset".
If there is still a bug fill a proper JIRA ticket with the exact description of the problem..

Just to conclude this thread there are 2 strange things I found:

1) Is LONG really not supported yet? If I use as output table LONG,STRING I get
      Exception in thread "main" java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: LONG
      at org.apache.calcite.util.Util.needToImplement(Util.java:967)

2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?

Best,
Flavio


On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <[hidden email]> wrote:
I think this might be a bug in `tableEnv.fromValues`.

Could you try to remove the DataType parameter, and let the framework derive the types?

final Table inputTable = tableEnv.fromValues(
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));

Best,
Jark


On Wed, 15 Jul 2020 at 11:19, Leonard Xu <[hidden email]> wrote:
Hi, Flavio

I reproduced your issue, and I think it should be a bug. But I’m not sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded calcite. 

Maybe Danny can help explain more.

CC: Danny

Best
Leonard Xu

在 2020年7月14日,23:06,Flavio Pompermaier <[hidden email]> 写道:

If I use 

final Table inputTable = tableEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
            DataTypes.FIELD("col2", DataTypes.STRING().notNull())
        ), ..
  tableEnv.executeSql(//
        "CREATE TABLE `out` (" +
            "col1 STRING," + 
            "col2 STRING" + 
            ") WITH (...)

the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct?

On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = '<a class="moz-txt-link-freetext" href="file://">file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 test Parquet sink

Flavio Pompermaier
I've just opened a ticket on JIRA: https://issues.apache.org/jira/browse/FLINK-18608

On Wed, Jul 15, 2020 at 10:10 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi,

Unfortunately this is a bug.

The problem is in CustomizedConvertRule#convertCast as it drops the requested nullability. It was fixed in master as part of FLINK-13784[1]. Therefore the example works on master.

Could you create a jira issue for 1.11 version? We could backport the corresponding part of FLINK-13784. As a workaround you can try using the values without registering it in the catalog, as the registration triggers the type check. (I know this is not perfect):

    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), ...);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    inputTable.executeInsert(`out`);

As for the types SQL does not have LONG nor STRING types. Java's long is equivalent to SQL's BIGINT. STRING is only an alias for VARCHAR(Long.MAX_VALUE), which was added for improved usability so that you do not need to type the max long manually. For complete list of supported types see the docs[2]


[1] https://issues.apache.org/jira/browse/FLINK-13784

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html

Best,

Dawid

On 15/07/2020 09:40, Flavio Pompermaier wrote:
If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1 FROM ParquetDataset".
If there is still a bug fill a proper JIRA ticket with the exact description of the problem..

Just to conclude this thread there are 2 strange things I found:

1) Is LONG really not supported yet? If I use as output table LONG,STRING I get
      Exception in thread "main" java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: LONG
      at org.apache.calcite.util.Util.needToImplement(Util.java:967)

2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?

Best,
Flavio


On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <[hidden email]> wrote:
I think this might be a bug in `tableEnv.fromValues`.

Could you try to remove the DataType parameter, and let the framework derive the types?

final Table inputTable = tableEnv.fromValues(
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));

Best,
Jark


On Wed, 15 Jul 2020 at 11:19, Leonard Xu <[hidden email]> wrote:
Hi, Flavio

I reproduced your issue, and I think it should be a bug. But I’m not sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded calcite. 

Maybe Danny can help explain more.

CC: Danny

Best
Leonard Xu

在 2020年7月14日,23:06,Flavio Pompermaier <[hidden email]> 写道:

If I use 

final Table inputTable = tableEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
            DataTypes.FIELD("col2", DataTypes.STRING().notNull())
        ), ..
  tableEnv.executeSql(//
        "CREATE TABLE `out` (" +
            "col1 STRING," + 
            "col2 STRING" + 
            ") WITH (...)

the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct?

On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm trying to test write to parquet using the following code but I have an error:

 final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), //
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));
    tableEnv.createTemporaryView("ParquetDataset", inputTable);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");

---------------------------------

Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
  LogicalUnion(all=[true])
    LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])
    LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
      LogicalValues(tuples=[[{ 0 }]])

at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)


What is wrong with my code?