How to use a nested column for CREATE TABLE PARTITIONED BY

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use a nested column for CREATE TABLE PARTITIONED BY

Dongwon Kim-2
Hi,

I want to create subdirectories named after values of a nested column, location.transId.

This is my first attempt:
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

It fails with the following errors:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)

As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
CREATE TABLE output (
  `partition` AS location.transId
) PARTITIONED BY (`partition`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It also fails:
 Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

Danny Chan
Hi, I execute the sql below 

  """
|create table navi (
| a STRING,
| location ROW<lastUpdateTime BIGINT, transId STRING>
|) with (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
|CREATE TABLE output (
| `partition` AS location.transId
|) PARTITIONED BY (`partition`)
|WITH (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|) LIKE navi (EXCLUDING ALL)
|""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim <[hidden email]>,写道:
Hi,

I want to create subdirectories named after values of a nested column, location.transId.

This is my first attempt:
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

It fails with the following errors:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)

As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
CREATE TABLE output (
  `partition` AS location.transId
) PARTITIONED BY (`partition`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It also fails:
 Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

Dongwon Kim-2
Hi Danny,

 Which version did you use
I use Flink 1.11.0.
 
 what SQL context throws the error ?
I think the declaration itself is not a problem.
The exception occurs when I tried to execute the following which I didn't show you in the previous email:
tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
 
Thanks,

Dongwon

On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <[hidden email]> wrote:
Hi, I execute the sql below 

  """
|create table navi (
| a STRING,
| location ROW<lastUpdateTime BIGINT, transId STRING>
|) with (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
|CREATE TABLE output (
| `partition` AS location.transId
|) PARTITIONED BY (`partition`)
|WITH (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|) LIKE navi (EXCLUDING ALL)
|""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim <[hidden email]>,写道:
Hi,

I want to create subdirectories named after values of a nested column, location.transId.

This is my first attempt:
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

It fails with the following errors:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)

As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
CREATE TABLE output (
  `partition` AS location.transId
) PARTITIONED BY (`partition`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It also fails:
 Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

Jark Wu-3
Hi Dongwon,

I think this is a bug in the Filesystem connector which doesn't exclude the computed columns when building the TableSource. 
I created an issue [1] to track this problem.

Best,
Jark


On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <[hidden email]> wrote:
Hi Danny,

 Which version did you use
I use Flink 1.11.0.
 
 what SQL context throws the error ?
I think the declaration itself is not a problem.
The exception occurs when I tried to execute the following which I didn't show you in the previous email:
tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
 
Thanks,

Dongwon

On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <[hidden email]> wrote:
Hi, I execute the sql below 

  """
|create table navi (
| a STRING,
| location ROW<lastUpdateTime BIGINT, transId STRING>
|) with (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
|CREATE TABLE output (
| `partition` AS location.transId
|) PARTITIONED BY (`partition`)
|WITH (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|) LIKE navi (EXCLUDING ALL)
|""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim <[hidden email]>,写道:
Hi,

I want to create subdirectories named after values of a nested column, location.transId.

This is my first attempt:
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

It fails with the following errors:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)

As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
CREATE TABLE output (
  `partition` AS location.transId
) PARTITIONED BY (`partition`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It also fails:
 Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

Dongwon Kim-2
Thanks Jark for the update.

However, getting back to the original question, can I use a nested column directly for CREATE TABLE PARTITIONED BY like below without declaring an additional column?

CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

I tried (`location`.transId) as well but it fails with an exception:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 3, column 27.
Was expecting one of:
    ")" ...
    "," ...
   
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 3, column 27.
Was expecting one of:
    ")" ...
    "," ...
   
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 3 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 3, column 27.
Was expecting one of:
    ")" ...
    "," ...
   
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
... 5 more

Best,

Dongwon

On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <[hidden email]> wrote:
Hi Dongwon,

I think this is a bug in the Filesystem connector which doesn't exclude the computed columns when building the TableSource. 
I created an issue [1] to track this problem.

Best,
Jark


On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <[hidden email]> wrote:
Hi Danny,

 Which version did you use
I use Flink 1.11.0.
 
 what SQL context throws the error ?
I think the declaration itself is not a problem.
The exception occurs when I tried to execute the following which I didn't show you in the previous email:
tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
 
Thanks,

Dongwon

On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <[hidden email]> wrote:
Hi, I execute the sql below 

  """
|create table navi (
| a STRING,
| location ROW<lastUpdateTime BIGINT, transId STRING>
|) with (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
|CREATE TABLE output (
| `partition` AS location.transId
|) PARTITIONED BY (`partition`)
|WITH (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|) LIKE navi (EXCLUDING ALL)
|""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim <[hidden email]>,写道:
Hi,

I want to create subdirectories named after values of a nested column, location.transId.

This is my first attempt:
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

It fails with the following errors:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)

As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
CREATE TABLE output (
  `partition` AS location.transId
) PARTITIONED BY (`partition`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It also fails:
 Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

Danny Chan
You can not do that in Flink yet, Flink partition column must be mapped to columns from the table schema which you can select from. The syntax is a little different from Hive’s =>

create table table_name (
  id                int,
  dtDontQuery       string,
  name              string
)
partitioned by (date string)

In which you can declare the partition column name & type at the same time.

Best,
Danny Chan
在 2020年7月21日 +0800 PM11:30,Dongwon Kim <[hidden email]>,写道:
Thanks Jark for the update.

However, getting back to the original question, can I use a nested column directly for CREATE TABLE PARTITIONED BY like below without declaring an additional column?

CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

I tried (`location`.transId) as well but it fails with an exception:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 3, column 27.
Was expecting one of:
    ")" ...
    "," ...
   
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 3, column 27.
Was expecting one of:
    ")" ...
    "," ...
   
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 3 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 3, column 27.
Was expecting one of:
    ")" ...
    "," ...
   
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
... 5 more

Best,

Dongwon

On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <[hidden email]> wrote:
Hi Dongwon,

I think this is a bug in the Filesystem connector which doesn't exclude the computed columns when building the TableSource. 
I created an issue [1] to track this problem.

Best,
Jark


On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <[hidden email]> wrote:
Hi Danny,

 Which version did you use
I use Flink 1.11.0.
 
 what SQL context throws the error ?
I think the declaration itself is not a problem.
The exception occurs when I tried to execute the following which I didn't show you in the previous email:
tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
 
Thanks,

Dongwon

On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <[hidden email]> wrote:
Hi, I execute the sql below 

  """
|create table navi (
| a STRING,
| location ROW<lastUpdateTime BIGINT, transId STRING>
|) with (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
|CREATE TABLE output (
| `partition` AS location.transId
|) PARTITIONED BY (`partition`)
|WITH (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|) LIKE navi (EXCLUDING ALL)
|""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim <[hidden email]>,写道:
Hi,

I want to create subdirectories named after values of a nested column, location.transId.

This is my first attempt:
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)

It fails with the following errors:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)

As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
CREATE TABLE output (
  `partition` AS location.transId
) PARTITIONED BY (`partition`)
WITH (
  'connector' = 'filesystem',
  'path' = 'east-out',
  'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It also fails:
 Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].

Thanks in advance,

Dongwon