|
You can not do that in Flink yet, Flink partition column must be mapped to columns from the table schema which you can select from. The syntax is a little different from Hive’s =>
create table table_name (
id int ,
dtDontQuery string,
name string
)
partitioned by (date string)
In which you can declare the partition column name & type at the same time.
在 2020年7月21日 +0800 PM11:30,Dongwon Kim < [hidden email]>,写道:
Thanks Jark for the update.
However, getting back to the original question, can I use a nested column directly for CREATE TABLE PARTITIONED BY like below without declaring an additional column?
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
'connector' = 'filesystem',
'path' = 'east-out',
'format' = 'json'
) LIKE navi (EXCLUDING ALL)
I tried (`location`.transId) as well but it fails with an exception:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 3, column 27.
Was expecting one of:
")" ...
"," ...
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 3, column 27.
Was expecting one of:
")" ...
"," ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 3 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 3, column 27.
Was expecting one of:
")" ...
"," ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
... 5 more
Best,
Dongwon
Hi Dongwon,
I think this is a bug in the Filesystem connector which doesn't exclude the computed columns when building the TableSource.
I created an issue [1] to track this problem.
Best,
Jark
Hi Danny,
Which version did you use
I use Flink 1.11.0.
what SQL context throws the error ?
I think the declaration itself is not a problem.
The exception occurs when I tried to execute the following which I didn't show you in the previous email:
tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
Thanks,
Dongwon
Hi, I execute the sql below
""" |create table navi ( | a STRING, | location ROW<lastUpdateTime BIGINT, transId STRING> |) with ( | 'connector' = 'filesystem', | 'path' = 'east-out', | 'format' = 'json' |) |""".stripMargin tableEnv.executeSql(sql0) val sql = """ |CREATE TABLE output ( | `partition` AS location.transId |) PARTITIONED BY (`partition`) |WITH ( | 'connector' = 'filesystem', | 'path' = 'east-out', | 'format' = 'json' |) LIKE navi (EXCLUDING ALL) |""".stripMargin tableEnv.executeSql(sql)
In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?
在 2020年7月21日 +0800 PM4:55,Dongwon Kim < [hidden email]>,写道:
Hi,
I want to create subdirectories named after values of a nested column, location.transId.
This is my first attempt:
CREATE TABLE output
PARTITIONED BY (`location.transId`)
WITH (
'connector' = 'filesystem',
'path' = 'east-out',
'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It fails with the following errors:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
CREATE TABLE output (
`partition` AS location.transId
) PARTITIONED BY (`partition`)
WITH (
'connector' = 'filesystem',
'path' = 'east-out',
'format' = 'json'
) LIKE navi (EXCLUDING ALL)
It also fails:
Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].
Thanks in advance,
Dongwon
|