Row function cannot have column reference through table alias

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Row function cannot have column reference through table alias

马阳阳
We have a sql that compose a row with a table’s columns. The simplified sql is like:
INSERT INTO flink_log_sink
SELECT
b.id,
    Row(b.app_id, b.message)
FROM flink_log_source a 
join flink_log_side b 
on a.id = b.id;

When we submit the sql to Flink, the sql cannot be parsed, with the following error message:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
at cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 15 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 17 more

Is this a bug or the expected behavior? If this is the expected behavior, what can we do to avoid it? 

PS:
I tried to create a view to represent the join result,  and inserted the view into the sink table. Unfortunately, it didn’t work neither.

Reply | Threaded
Open this post in threaded view
|

Re:Row function cannot have column reference through table alias

马阳阳
The error message when using a view is as the following:
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

FlinkLogicalSink(table=[default_catalog.default_database.flink_log_sink], fields=[id, EXPR$1])
+- FlinkLogicalCalc(select=[id, ROW(family.app_id, family.message) AS EXPR$1])
+- FlinkLogicalJoin(condition=[true], joinType=[inner])
:- FlinkLogicalCalc(select=[app_id], where=[IS NOT NULL(id)])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, flink_log_source]], fields=[app_id, id, log_time, message])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, flink_log_side]], fields=[id, family])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1275)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:680)
at cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:43)
at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
at cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single]
There is 1 empty subset: rel#311:RelSubset#14.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows
181:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, flink_log_side]], fields=[id, family])

Root: rel#303:RelSubset#17.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
Original rel:
FlinkLogicalSink(subset=[rel#175:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE]], table=[default_catalog.default_database.flink_log_sink], fields=[id, EXPR$1]): rowcount = 9.0E15, cumulative cost = {9.0E15 rows, 9.0E15 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 198
FlinkLogicalCalc(subset=[rel#197:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE]], select=[id, ROW(family.app_id, family.message) AS EXPR$1]): rowcount = 9.0E15, cumulative cost = {9.0E15 rows, 9.0E15 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 235
FlinkLogicalJoin(subset=[rel#212:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[true], joinType=[inner]): rowcount = 9.0E15, cumulative cost = {9.0E7 rows, 1.9E8 cpu, 1.18E9 io, 0.0 network, 0.0 memory}, id = 211
FlinkLogicalCalc(subset=[rel#210:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE]], select=[app_id], where=[IS NOT NULL(id)]): rowcount = 9.0E7, cumulative cost = {9.0E7 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 237
FlinkLogicalTableSourceScan(subset=[rel#178:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, flink_log_source]], fields=[app_id, id, log_time, message]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}, id = 177
FlinkLogicalTableSourceScan(subset=[rel#182:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, flink_log_side]], fields=[id, family]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 181

Sets:
Set#12, type: RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, BIGINT log_time, VARCHAR(2147483647) message)
rel#292:RelSubset#12.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#177
rel#177:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}
rel#306:RelSubset#12.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#305
rel#305:StreamExecTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}
Set#13, type: RecordType(VARCHAR(2147483647) app_id)
rel#294:RelSubset#13.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#293
rel#293:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#292,select=app_id,where=IS NOT NULL(id)), rowcount=9.0E7, cumulative cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}
rel#308:RelSubset#13.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#307
rel#307:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#306,select=app_id,where=IS NOT NULL(id)), rowcount=9.0E7, cumulative cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}
rel#310:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=9.0E7, cumulative cost={inf}
rel#318:StreamExecExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,distribution=single), rowcount=9.0E7, cumulative cost={2.8E8 rows, 1.459E10 cpu, 4.4E9 io, 1.08E9 network, 0.0 memory}
rel#309:RelSubset#13.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=rel#318
rel#310:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=9.0E7, cumulative cost={inf}
rel#318:StreamExecExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,distribution=single), rowcount=9.0E7, cumulative cost={2.8E8 rows, 1.459E10 cpu, 4.4E9 io, 1.08E9 network, 0.0 memory}
Set#14, type: RecordType(VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family)
rel#295:RelSubset#14.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#181
rel#181:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, flink_log_side],fields=id, family), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
rel#311:RelSubset#14.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=null
Set#15, type: RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family)
rel#297:RelSubset#15.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#296
rel#296:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#294,right=RelSubset#295,condition=true,joinType=inner), rowcount=9.0E15, cumulative cost={3.8E8 rows, 3.9E8 cpu, 9.18E9 io, 0.0 network, 0.0 memory}
rel#313:RelSubset#15.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
rel#312:StreamExecJoin.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](left=RelSubset#309,right=RelSubset#311,joinType=InnerJoin,where=true,select=app_id, id, family,leftInputSpec=NoUniqueKey,rightInputSpec=HasUniqueKey), rowcount=9.0E15, cumulative cost={inf}
Set#16, type: RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)
rel#299:RelSubset#16.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#298
rel#298:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#297,select=id, ROW(family.app_id, family.message) AS EXPR$1), rowcount=9.0E15, cumulative cost={9.00000038E15 rows, 9.00000039E15 cpu, 9.18E9 io, 0.0 network, 0.0 memory}
rel#315:RelSubset#16.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
rel#314:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#313,select=id, ROW(family.app_id, family.message) AS EXPR$1), rowcount=9.0E15, cumulative cost={inf}
Set#17, type: RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)
rel#302:RelSubset#17.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#301
rel#301:FlinkLogicalSink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#299,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1), rowcount=9.0E15, cumulative cost={1.800000038E16 rows, 1.800000039E16 cpu, 9.18E9 io, 0.0 network, 0.0 memory}
rel#303:RelSubset#17.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
rel#304:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#302,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=9.0E15, cumulative cost={inf}
rel#316:StreamExecSink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#315,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1), rowcount=9.0E15, cumulative cost={inf}

Graphviz:
digraph G {
root [style=filled,label="Root"];
subgraph cluster12{
label="Set 12 RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, BIGINT log_time, VARCHAR(2147483647) message)";
rel177 [label="rel#177:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel305 [label="rel#305:StreamExecTableSourceScan\ntable=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset292 [label="rel#292:RelSubset#12.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset306 [label="rel#306:RelSubset#12.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
subgraph cluster13{
label="Set 13 RecordType(VARCHAR(2147483647) app_id)";
rel293 [label="rel#293:FlinkLogicalCalc\ninput=RelSubset#292,select=app_id,where=IS NOT NULL(id)\nrows=9.0E7, cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel307 [label="rel#307:StreamExecCalc\ninput=RelSubset#306,select=app_id,where=IS NOT NULL(id)\nrows=9.0E7, cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel310 [label="rel#310:AbstractConverter\ninput=RelSubset#308,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=9.0E7, cost={inf}",shape=box]
rel318 [label="rel#318:StreamExecExchange\ninput=RelSubset#308,distribution=single\nrows=9.0E7, cost={2.8E8 rows, 1.459E10 cpu, 4.4E9 io, 1.08E9 network, 0.0 memory}",color=blue,shape=box]
subset294 [label="rel#294:RelSubset#13.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset308 [label="rel#308:RelSubset#13.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
subset309 [label="rel#309:RelSubset#13.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]"]
subset308 -> subset309; }
subgraph cluster14{
label="Set 14 RecordType(VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family)";
rel181 [label="rel#181:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, flink_log_side],fields=id, family\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset295 [label="rel#295:RelSubset#14.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset311 [label="rel#311:RelSubset#14.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]",color=red]
}
subgraph cluster15{
label="Set 15 RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family)";
rel296 [label="rel#296:FlinkLogicalJoin\nleft=RelSubset#294,right=RelSubset#295,condition=true,joinType=inner\nrows=9.0E15, cost={3.8E8 rows, 3.9E8 cpu, 9.18E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel312 [label="rel#312:StreamExecJoin\nleft=RelSubset#309,right=RelSubset#311,joinType=InnerJoin,where=true,select=app_id, id, family,leftInputSpec=NoUniqueKey,rightInputSpec=HasUniqueKey\nrows=9.0E15, cost={inf}",shape=box]
subset297 [label="rel#297:RelSubset#15.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset313 [label="rel#313:RelSubset#15.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
subgraph cluster16{
label="Set 16 RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)";
rel298 [label="rel#298:FlinkLogicalCalc\ninput=RelSubset#297,select=id, ROW(family.app_id, family.message) AS EXPR$1\nrows=9.0E15, cost={9.00000038E15 rows, 9.00000039E15 cpu, 9.18E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel314 [label="rel#314:StreamExecCalc\ninput=RelSubset#313,select=id, ROW(family.app_id, family.message) AS EXPR$1\nrows=9.0E15, cost={inf}",shape=box]
subset299 [label="rel#299:RelSubset#16.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset315 [label="rel#315:RelSubset#16.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
subgraph cluster17{
label="Set 17 RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)";
rel301 [label="rel#301:FlinkLogicalSink\ninput=RelSubset#299,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1\nrows=9.0E15, cost={1.800000038E16 rows, 1.800000039E16 cpu, 9.18E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel304 [label="rel#304:AbstractConverter\ninput=RelSubset#302,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=9.0E15, cost={inf}",shape=box]
rel316 [label="rel#316:StreamExecSink\ninput=RelSubset#315,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1\nrows=9.0E15, cost={inf}",shape=box]
subset302 [label="rel#302:RelSubset#17.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset303 [label="rel#303:RelSubset#17.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
root -> subset303;
subset292 -> rel177[color=blue];
subset306 -> rel305[color=blue];
subset294 -> rel293[color=blue]; rel293 -> subset292[color=blue];
subset308 -> rel307[color=blue]; rel307 -> subset306[color=blue];
subset309 -> rel310; rel310 -> subset308;
subset309 -> rel318[color=blue]; rel318 -> subset308[color=blue];
subset295 -> rel181[color=blue];
subset297 -> rel296[color=blue]; rel296 -> subset294[color=blue,label="0"]; rel296 -> subset295[color=blue,label="1"];
subset313 -> rel312; rel312 -> subset309[label="0"]; rel312 -> subset311[label="1"];
subset299 -> rel298[color=blue]; rel298 -> subset297[color=blue];
subset315 -> rel314; rel314 -> subset313;
subset302 -> rel301[color=blue]; rel301 -> subset299[color=blue];
subset303 -> rel304; rel304 -> subset302;
subset303 -> rel316; rel316 -> subset315;
}
at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742)
at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365)
at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
... 32 more


On 01/11/2021 11:04[hidden email] wrote:
We have a sql that compose a row with a table’s columns. The simplified sql is like:
INSERT INTO flink_log_sink
SELECT
b.id,
Row(b.app_id, b.message)
FROM flink_log_source a
join flink_log_side b
on a.id = b.id;


When we submit the sql to Flink, the sql cannot be parsed, with the following error message:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
at cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 15 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 17 more


Is this a bug or the expected behavior? If this is the expected behavior, what can we do to avoid it?


PS:
I tried to create a view to represent the join result,  and inserted the view into the sink table. Unfortunately, it didn’t work neither.
| |
马阳阳
|
|
[hidden email]
|
签名由网易邮箱大师定制