This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive. I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form
of: SELECT t1.id, t1.attr1, t2.attr2 FROM table1 t1 LEFT JOIN LATERAL ( SELECT id, attr2 FROM ( SELECT id, attr2, ROW_NUMBER() OVER ( PARTITION BY id attr3 DESC, t1.attr4 = attr4 DESC ) AS row_num FROM table2 WHERE row_num = 1) t2 ON (t1.id = t2.id) I am getting an error that looks like: Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97) at io.oseberg.flink.well.ok.Job$.main(Job.scala:57) at io.oseberg.flink.well.ok.Job.main(Job.scala) The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something
I’m not thinking of doing. Regards, Dylan Forciea |
Hi Dylan, Could you provide which Flink version you find out the problem with? I test the above query on master, and I get the plan, no errors occur. Here is my test case: @Test Best, Godfrey Dylan Forciea <[hidden email]> 于2020年11月18日周三 上午7:44写道:
|
Godfrey, I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector
tables and not directly. Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made
the plan start working. Table 2 is formulated as follows: SELECT T.id, attr2, attr4 FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id) Where SplitStringToRows is defined as: @FunctionHint(output
= new
DataTypeHint("ROW<val STRING>")) class
SplitStringToRows
extends
TableFunction[Row]
{
def
eval(str:
String,
separator:
String =
";"):
Unit = {
if (str !=
null) { str.split(separator).foreach(s => collect(Row.of(s.trim()))) } } } Removing the lateral table bit in that first table made the original query plan work correctly. I greatly appreciate your assistance! Regards, Dylan Forciea From: godfrey he <[hidden email]> Hi Dylan, Could you provide which Flink version you find out the problem with? I test the above query on master, and I get the plan, no errors occur. Here is my test case: @Test Best, Godfrey Dylan Forciea <[hidden email]>
于2020年11月18日周三
上午7:44写道:
|
Dylan, Thanks for you feedback, if the planner encounters "unexpected correlate variable $cor2 in the plan" exception, There's a high probability that FlinkDecorrelateProgram has some bugs or the query pattern is not supported now. I try to use JDBC Connector as the input tables, but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc. Thanks so much. Best, Godfrey Dylan Forciea <[hidden email]> 于2020年11月18日周三 下午10:09写道:
|
Godfrey, I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code: import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.functions.TableFunction @FunctionHint(output
= new
DataTypeHint("ROW<val STRING>")) class
SplitStringToRows
extends
TableFunction[Row]
{
def
eval(str:
String,
separator:
String =
";"):
Unit = {
if (str !=
null) { str.split(separator).foreach(s => collect(Row.of(s.trim()))) } } } object
Job {
def
main(args:
Array[String]):
Unit = {
val
settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val
streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment
val
streamTableEnv =
StreamTableEnvironment.create(streamEnv, settings) streamTableEnv.createTemporarySystemFunction(
"SplitStringToRows",
classOf[SplitStringToRows] )
// Class defined in previous email streamTableEnv.executeSql(
""" CREATE TABLE table1 ( id_source BIGINT PRIMARY KEY, attr1_source STRING, attr2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true', 'table-name' = '<table>', 'username' = '<username>', 'password' = '<password>', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false') """) streamTableEnv.executeSql(
""" CREATE TABLE table2 ( attr1_source STRING, attr2 STRING, attr3 DECIMAL, attr4 DATE ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true', 'table-name' = '<table>', 'username' = '<username>', 'password' = '<password>', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false') """)
val
q1 = streamTableEnv.sqlQuery(""" SELECT id_source AS id, attr1_source AS attr1, attr2 FROM table1 """) streamTableEnv.createTemporaryView("view1",
q1)
val
q2 = streamTableEnv.sqlQuery(
""" SELECT a.attr1 AS attr1, attr2, attr3, attr4 FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1) """) streamTableEnv.createTemporaryView("view2",
q2)
val
q3 = streamTableEnv.sqlQuery(""" SELECT w.attr1, p.attr3 FROM view1 w LEFT JOIN LATERAL ( SELECT attr1, attr3 FROM ( SELECT attr1, attr3, ROW_NUMBER() OVER ( PARTITION BY attr1 ORDER BY attr4 DESC NULLS LAST, w.attr2 = attr2 DESC NULLS LAST ) AS row_num FROM view2) WHERE row_num = 1) p ON (w.attr1 = p.attr1) """) streamTableEnv.createTemporaryView("view3",
q3)
val
view3 = streamTableEnv.from("view3") view3 .toRetractStream[Row] .writeAsCsv("./view3.csv",
WriteMode.OVERWRITE) .setParallelism(1) streamEnv.execute() } } Thanks, Dylan Forciea From: godfrey he <[hidden email]> Dylan, Thanks for you feedback, if the planner encounters "unexpected correlate variable $cor2 in the plan" exception, There's a high probability that FlinkDecorrelateProgram has some bugs or the query pattern is not supported now. I try to use JDBC Connector as the input tables, but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc. Thanks so much. Best, Godfrey Dylan Forciea <[hidden email]>
于2020年11月18日周三
下午10:09写道:
|
Hi Dylan, I have reproduced your issue based on your code, Currently Flink does not support such nested correlate pattern query. I have created a issue to track this [1]. Thanks for your reporting and help. Best, Godfrey Dylan Forciea <[hidden email]> 于2020年11月19日周四 下午12:10写道:
|
Godfrey, Glad I could help! I suspected that was what the problem was. I have made a view in my postgres database to perform the inner lateral join, so that should let me work around this for the time being. Thanks, Dylan From: godfrey he <[hidden email]> Hi Dylan, I have reproduced your issue based on your code, Currently Flink does not support such nested correlate pattern query. I have created a issue to track this [1]. Thanks for your reporting and help. Best, Godfrey Dylan Forciea <[hidden email]>
于2020年11月19日周四
下午12:10写道:
|
Free forum by Nabble | Edit this page |