Lateral join not finding correlate variable

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Lateral join not finding correlate variable

Dylan Forciea

This may be due to not understanding  lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.

 

I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:

 

SELECT

  t1.id,

  t1.attr1,

  t2.attr2

FROM table1 t1

LEFT JOIN LATERAL (

  SELECT

    id,

    attr2

  FROM (

    SELECT

      id,

      attr2,

      ROW_NUMBER() OVER (

        PARTITION BY id
        ORDER BY

          attr3 DESC,

          t1.attr4 = attr4 DESC

      ) AS row_num

    FROM table2

    WHERE row_num = 1) t2

ON (t1.id = t2.id)

 

I am getting an error that looks like:

 

Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

     at scala.collection.Iterator.foreach(Iterator.scala:943)

     at scala.collection.Iterator.foreach$(Iterator.scala:943)

     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

     at scala.collection.IterableLike.foreach(IterableLike.scala:74)

     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)

     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)

     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

     at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)

     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)

     at io.oseberg.flink.well.ok.Job.main(Job.scala)

 

The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.

 

Regards,

Dylan Forciea

Reply | Threaded
Open this post in threaded view
|

Re: Lateral join not finding correlate variable

godfrey he
Hi Dylan,

Could you provide which Flink version you find out the problem with? 
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:
@Test
def testLateralJoin(): Unit = {
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
val query =
"""
|SELECT
| t1.id,
| t1.attr1,
| t2.attr2
|FROM table1 t1
|LEFT JOIN LATERAL (
| SELECT
| id,
| attr2
| FROM (
| SELECT
| id,
| attr2,
| ROW_NUMBER() OVER (
| PARTITION BY id
| ORDER BY
| attr3 DESC,
| t1.attr4 = attr4 DESC
| ) AS row_num
| FROM table2)
| WHERE row_num = 1) t2
|ON t1.id = t2.id
|""".stripMargin
util.verifyPlan(query)
}
Best,
Godfrey

Dylan Forciea <[hidden email]> 于2020年11月18日周三 上午7:44写道:

This may be due to not understanding  lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.

 

I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:

 

SELECT

  t1.id,

  t1.attr1,

  t2.attr2

FROM table1 t1

LEFT JOIN LATERAL (

  SELECT

    id,

    attr2

  FROM (

    SELECT

      id,

      attr2,

      ROW_NUMBER() OVER (

        PARTITION BY id
        ORDER BY

          attr3 DESC,

          t1.attr4 = attr4 DESC

      ) AS row_num

    FROM table2

    WHERE row_num = 1) t2

ON (t1.id = t2.id)

 

I am getting an error that looks like:

 

Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

     at scala.collection.Iterator.foreach(Iterator.scala:943)

     at scala.collection.Iterator.foreach$(Iterator.scala:943)

     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

     at scala.collection.IterableLike.foreach(IterableLike.scala:74)

     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)

     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)

     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

     at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)

     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)

     at io.oseberg.flink.well.ok.Job.main(Job.scala)

 

The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.

 

Regards,

Dylan Forciea

Reply | Threaded
Open this post in threaded view
|

Re: Lateral join not finding correlate variable

Dylan Forciea

Godfrey,

 

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.

 

Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:

 

SELECT

  T.id,

  attr2,
  attr3,

  attr4

FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)

 

Where SplitStringToRows is defined as:

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

 

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

 

Removing the lateral table bit in that first table made the original query plan work correctly.

 

I greatly appreciate your assistance!

 

Regards,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Hi Dylan,

 

Could you provide which Flink version you find out the problem with? 

I test the above query on master, and I get the plan, no errors occur.

Here is my test case:

@Test
def testLateralJoin(): Unit = {
 
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
val query =
   
"""
      |SELECT
      |  t1.id,
      |  t1.attr1,
      |  t2.attr2
      |FROM table1 t1
      |LEFT JOIN LATERAL (
      |  SELECT
      |    id,
      |    attr2
      |  FROM (
      |    SELECT
      |      id,
      |      attr2,
      |      ROW_NUMBER() OVER (
      |        PARTITION BY id
      |        ORDER BY
      |          attr3 DESC,
      |          t1.attr4 = attr4 DESC
      |      ) AS row_num
      |    FROM table2)
      |    WHERE row_num = 1) t2
      |ON t1.id = t2.id
      |"""
.stripMargin
 
util.verifyPlan(query)
}

Best,

Godfrey

 

Dylan Forciea <[hidden email]> 20201118日周三 上午7:44写道:

This may be due to not understanding  lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.

 

I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:

 

SELECT

  t1.id,

  t1.attr1,

  t2.attr2

FROM table1 t1

LEFT JOIN LATERAL (

  SELECT

    id,

    attr2

  FROM (

    SELECT

      id,

      attr2,

      ROW_NUMBER() OVER (

        PARTITION BY id
        ORDER BY

          attr3 DESC,

          t1.attr4 = attr4 DESC

      ) AS row_num

    FROM table2

    WHERE row_num = 1) t2

ON (t1.id = t2.id)

 

I am getting an error that looks like:

 

Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

     at scala.collection.Iterator.foreach(Iterator.scala:943)

     at scala.collection.Iterator.foreach$(Iterator.scala:943)

     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

     at scala.collection.IterableLike.foreach(IterableLike.scala:74)

     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)

     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)

     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

     at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)

     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)

     at io.oseberg.flink.well.ok.Job.main(Job.scala)

 

The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.

 

Regards,

Dylan Forciea

Reply | Threaded
Open this post in threaded view
|

Re: Lateral join not finding correlate variable

godfrey he
Dylan,

Thanks for you feedback, if the planner encounters 
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs 
or the query pattern is not supported now. I try to use JDBC Connector as the input tables,
but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc.
Thanks so much. 

Best,
Godfrey



Dylan Forciea <[hidden email]> 于2020年11月18日周三 下午10:09写道:

Godfrey,

 

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.

 

Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:

 

SELECT

  T.id,

  attr2,
  attr3,

  attr4

FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)

 

Where SplitStringToRows is defined as:

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

 

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

 

Removing the lateral table bit in that first table made the original query plan work correctly.

 

I greatly appreciate your assistance!

 

Regards,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Hi Dylan,

 

Could you provide which Flink version you find out the problem with? 

I test the above query on master, and I get the plan, no errors occur.

Here is my test case:

@Test
def testLateralJoin(): Unit = {
 
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
val query =
   
"""
      |SELECT
      |  t1.id,
      |  t1.attr1,
      |  t2.attr2
      |FROM table1 t1
      |LEFT JOIN LATERAL (
      |  SELECT
      |    id,
      |    attr2
      |  FROM (
      |    SELECT
      |      id,
      |      attr2,
      |      ROW_NUMBER() OVER (
      |        PARTITION BY id
      |        ORDER BY
      |          attr3 DESC,
      |          t1.attr4 = attr4 DESC
      |      ) AS row_num
      |    FROM table2)
      |    WHERE row_num = 1) t2
      |ON t1.id = t2.id
      |"""
.stripMargin
 
util.verifyPlan(query)
}

Best,

Godfrey

 

Dylan Forciea <[hidden email]> 20201118日周三 上午7:44写道:

This may be due to not understanding  lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.

 

I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:

 

SELECT

  t1.id,

  t1.attr1,

  t2.attr2

FROM table1 t1

LEFT JOIN LATERAL (

  SELECT

    id,

    attr2

  FROM (

    SELECT

      id,

      attr2,

      ROW_NUMBER() OVER (

        PARTITION BY id
        ORDER BY

          attr3 DESC,

          t1.attr4 = attr4 DESC

      ) AS row_num

    FROM table2

    WHERE row_num = 1) t2

ON (t1.id = t2.id)

 

I am getting an error that looks like:

 

Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

     at scala.collection.Iterator.foreach(Iterator.scala:943)

     at scala.collection.Iterator.foreach$(Iterator.scala:943)

     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

     at scala.collection.IterableLike.foreach(IterableLike.scala:74)

     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)

     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)

     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

     at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)

     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)

     at io.oseberg.flink.well.ok.Job.main(Job.scala)

 

The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.

 

Regards,

Dylan Forciea

Reply | Threaded
Open this post in threaded view
|

Re: Lateral join not finding correlate variable

Dylan Forciea

Godfrey,

 

I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code:

 

import org.apache.flink.api.scala._

import org.apache.flink.core.fs.FileSystem.WriteMode

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api._

import org.apache.flink.table.api.bridge.scala._

import org.apache.flink.types.Row

import org.apache.flink.table.annotation.FunctionHint

import org.apache.flink.table.annotation.DataTypeHint

import org.apache.flink.table.functions.TableFunction

 

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

object Job {

 

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    streamTableEnv.createTemporarySystemFunction(

      "SplitStringToRows",

      classOf[SplitStringToRows]

    ) // Class defined in previous email

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id_source BIGINT PRIMARY KEY,

        attr1_source STRING,

        attr2 STRING

      ) WITH (

       'connector' = 'jdbc',

       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',

       'table-name' = '<table>',

       'username' = '<username>',

       'password' = '<password>',

       'scan.fetch-size' = '500',

       'scan.auto-commit' = 'false')

    """)

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr1_source STRING,

        attr2 STRING,

        attr3 DECIMAL,

        attr4 DATE

      ) WITH (

       'connector' = 'jdbc',

       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',

       'table-name' = '<table>',

       'username' = '<username>',

       'password' = '<password>',

       'scan.fetch-size' = '500',

       'scan.auto-commit' = 'false')

    """)

 

    val q1 = streamTableEnv.sqlQuery("""

      SELECT

        id_source AS id,

        attr1_source AS attr1,

        attr2

      FROM table1

    """)

    streamTableEnv.createTemporaryView("view1", q1)

 

    val q2 = streamTableEnv.sqlQuery(

      """

        SELECT

          a.attr1 AS attr1,

          attr2,

          attr3,

          attr4

        FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1)

    """)

    streamTableEnv.createTemporaryView("view2", q2)

 

    val q3 = streamTableEnv.sqlQuery("""

        SELECT

          w.attr1,

          p.attr3

        FROM view1 w

        LEFT JOIN LATERAL (

          SELECT

            attr1,

            attr3

          FROM (

            SELECT

              attr1,

              attr3,

              ROW_NUMBER() OVER (

                PARTITION BY attr1

                ORDER BY

                  attr4 DESC NULLS LAST,

                  w.attr2 = attr2 DESC NULLS LAST

              ) AS row_num

          FROM view2)

          WHERE row_num = 1) p

        ON (w.attr1 = p.attr1)

        """)

    streamTableEnv.createTemporaryView("view3", q3)

 

    val view3 = streamTableEnv.from("view3")

 

    view3

      .toRetractStream[Row]

      .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)

      .setParallelism(1)

 

    streamEnv.execute()

  }

}

 

Thanks,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Dylan,

 

Thanks for you feedback, if the planner encounters 

"unexpected correlate variable $cor2 in the plan" exception,

There's a high probability that FlinkDecorrelateProgram has some bugs 

or the query pattern is not supported now. I try to use JDBC Connector as the input tables,

but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc.

Thanks so much. 

 

Best,

Godfrey

 

 

 

Dylan Forciea <[hidden email]> 20201118日周三 下午10:09写道:

Godfrey,

 

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.

 

Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:

 

SELECT

  T.id,

  attr2,
  attr3,

  attr4

FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)

 

Where SplitStringToRows is defined as:

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

 

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

 

Removing the lateral table bit in that first table made the original query plan work correctly.

 

I greatly appreciate your assistance!

 

Regards,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Hi Dylan,

 

Could you provide which Flink version you find out the problem with? 

I test the above query on master, and I get the plan, no errors occur.

Here is my test case:

@Test
def testLateralJoin(): Unit = {
 
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
val query =
   
"""
      |SELECT
      |  t1.id,
      |  t1.attr1,
      |  t2.attr2
      |FROM table1 t1
      |LEFT JOIN LATERAL (
      |  SELECT
      |    id,
      |    attr2
      |  FROM (
      |    SELECT
      |      id,
      |      attr2,
      |      ROW_NUMBER() OVER (
      |        PARTITION BY id
      |        ORDER BY
      |          attr3 DESC,
      |          t1.attr4 = attr4 DESC
      |      ) AS row_num
      |    FROM table2)
      |    WHERE row_num = 1) t2
      |ON t1.id = t2.id
      |"""
.stripMargin
 
util.verifyPlan(query)
}

Best,

Godfrey

 

Dylan Forciea <[hidden email]> 20201118日周三 上午7:44写道:

This may be due to not understanding  lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.

 

I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:

 

SELECT

  t1.id,

  t1.attr1,

  t2.attr2

FROM table1 t1

LEFT JOIN LATERAL (

  SELECT

    id,

    attr2

  FROM (

    SELECT

      id,

      attr2,

      ROW_NUMBER() OVER (

        PARTITION BY id
        ORDER BY

          attr3 DESC,

          t1.attr4 = attr4 DESC

      ) AS row_num

    FROM table2

    WHERE row_num = 1) t2

ON (t1.id = t2.id)

 

I am getting an error that looks like:

 

Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

     at scala.collection.Iterator.foreach(Iterator.scala:943)

     at scala.collection.Iterator.foreach$(Iterator.scala:943)

     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

     at scala.collection.IterableLike.foreach(IterableLike.scala:74)

     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)

     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)

     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

     at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)

     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)

     at io.oseberg.flink.well.ok.Job.main(Job.scala)

 

The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.

 

Regards,

Dylan Forciea

Reply | Threaded
Open this post in threaded view
|

Re: Lateral join not finding correlate variable

godfrey he
Hi Dylan,

I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query. 
I have created a issue to track this [1].
Thanks for your reporting and help.


Best,
Godfrey

Dylan Forciea <[hidden email]> 于2020年11月19日周四 下午12:10写道:

Godfrey,

 

I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code:

 

import org.apache.flink.api.scala._

import org.apache.flink.core.fs.FileSystem.WriteMode

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api._

import org.apache.flink.table.api.bridge.scala._

import org.apache.flink.types.Row

import org.apache.flink.table.annotation.FunctionHint

import org.apache.flink.table.annotation.DataTypeHint

import org.apache.flink.table.functions.TableFunction

 

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

object Job {

 

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    streamTableEnv.createTemporarySystemFunction(

      "SplitStringToRows",

      classOf[SplitStringToRows]

    ) // Class defined in previous email

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id_source BIGINT PRIMARY KEY,

        attr1_source STRING,

        attr2 STRING

      ) WITH (

       'connector' = 'jdbc',

       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',

       'table-name' = '<table>',

       'username' = '<username>',

       'password' = '<password>',

       'scan.fetch-size' = '500',

       'scan.auto-commit' = 'false')

    """)

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr1_source STRING,

        attr2 STRING,

        attr3 DECIMAL,

        attr4 DATE

      ) WITH (

       'connector' = 'jdbc',

       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',

       'table-name' = '<table>',

       'username' = '<username>',

       'password' = '<password>',

       'scan.fetch-size' = '500',

       'scan.auto-commit' = 'false')

    """)

 

    val q1 = streamTableEnv.sqlQuery("""

      SELECT

        id_source AS id,

        attr1_source AS attr1,

        attr2

      FROM table1

    """)

    streamTableEnv.createTemporaryView("view1", q1)

 

    val q2 = streamTableEnv.sqlQuery(

      """

        SELECT

          a.attr1 AS attr1,

          attr2,

          attr3,

          attr4

        FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1)

    """)

    streamTableEnv.createTemporaryView("view2", q2)

 

    val q3 = streamTableEnv.sqlQuery("""

        SELECT

          w.attr1,

          p.attr3

        FROM view1 w

        LEFT JOIN LATERAL (

          SELECT

            attr1,

            attr3

          FROM (

            SELECT

              attr1,

              attr3,

              ROW_NUMBER() OVER (

                PARTITION BY attr1

                ORDER BY

                  attr4 DESC NULLS LAST,

                  w.attr2 = attr2 DESC NULLS LAST

              ) AS row_num

          FROM view2)

          WHERE row_num = 1) p

        ON (w.attr1 = p.attr1)

        """)

    streamTableEnv.createTemporaryView("view3", q3)

 

    val view3 = streamTableEnv.from("view3")

 

    view3

      .toRetractStream[Row]

      .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)

      .setParallelism(1)

 

    streamEnv.execute()

  }

}

 

Thanks,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Dylan,

 

Thanks for you feedback, if the planner encounters 

"unexpected correlate variable $cor2 in the plan" exception,

There's a high probability that FlinkDecorrelateProgram has some bugs 

or the query pattern is not supported now. I try to use JDBC Connector as the input tables,

but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc.

Thanks so much. 

 

Best,

Godfrey

 

 

 

Dylan Forciea <[hidden email]> 20201118日周三 下午10:09写道:

Godfrey,

 

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.

 

Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:

 

SELECT

  T.id,

  attr2,
  attr3,

  attr4

FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)

 

Where SplitStringToRows is defined as:

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

 

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

 

Removing the lateral table bit in that first table made the original query plan work correctly.

 

I greatly appreciate your assistance!

 

Regards,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Hi Dylan,

 

Could you provide which Flink version you find out the problem with? 

I test the above query on master, and I get the plan, no errors occur.

Here is my test case:

@Test
def testLateralJoin(): Unit = {
 
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
val query =
   
"""
      |SELECT
      |  t1.id,
      |  t1.attr1,
      |  t2.attr2
      |FROM table1 t1
      |LEFT JOIN LATERAL (
      |  SELECT
      |    id,
      |    attr2
      |  FROM (
      |    SELECT
      |      id,
      |      attr2,
      |      ROW_NUMBER() OVER (
      |        PARTITION BY id
      |        ORDER BY
      |          attr3 DESC,
      |          t1.attr4 = attr4 DESC
      |      ) AS row_num
      |    FROM table2)
      |    WHERE row_num = 1) t2
      |ON t1.id = t2.id
      |"""
.stripMargin
 
util.verifyPlan(query)
}

Best,

Godfrey

 

Dylan Forciea <[hidden email]> 20201118日周三 上午7:44写道:

This may be due to not understanding  lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.

 

I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:

 

SELECT

  t1.id,

  t1.attr1,

  t2.attr2

FROM table1 t1

LEFT JOIN LATERAL (

  SELECT

    id,

    attr2

  FROM (

    SELECT

      id,

      attr2,

      ROW_NUMBER() OVER (

        PARTITION BY id
        ORDER BY

          attr3 DESC,

          t1.attr4 = attr4 DESC

      ) AS row_num

    FROM table2

    WHERE row_num = 1) t2

ON (t1.id = t2.id)

 

I am getting an error that looks like:

 

Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

     at scala.collection.Iterator.foreach(Iterator.scala:943)

     at scala.collection.Iterator.foreach$(Iterator.scala:943)

     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

     at scala.collection.IterableLike.foreach(IterableLike.scala:74)

     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)

     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)

     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

     at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)

     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)

     at io.oseberg.flink.well.ok.Job.main(Job.scala)

 

The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.

 

Regards,

Dylan Forciea

Reply | Threaded
Open this post in threaded view
|

Re: Lateral join not finding correlate variable

Dylan Forciea

Godfrey,

 

Glad I could help! I suspected that was what the problem was. I have made a view in my postgres database to perform the inner lateral join, so that should let me work around this for the time being.

 

Thanks,

Dylan

 

From: godfrey he <[hidden email]>
Date: Friday, November 20, 2020 at 1:09 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Hi Dylan,

 

I have reproduced your issue based on your code,

Currently Flink does not support such nested correlate pattern query. 

I have created a issue to track this [1].

Thanks for your reporting and help.

 

 

Best,

Godfrey

 

Dylan Forciea <[hidden email]> 20201119日周四 下午12:10写道:

Godfrey,

 

I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code:

 

import org.apache.flink.api.scala._

import org.apache.flink.core.fs.FileSystem.WriteMode

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api._

import org.apache.flink.table.api.bridge.scala._

import org.apache.flink.types.Row

import org.apache.flink.table.annotation.FunctionHint

import org.apache.flink.table.annotation.DataTypeHint

import org.apache.flink.table.functions.TableFunction

 

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

object Job {

 

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

 

    streamTableEnv.createTemporarySystemFunction(

      "SplitStringToRows",

      classOf[SplitStringToRows]

    ) // Class defined in previous email

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table1 (

        id_source BIGINT PRIMARY KEY,

        attr1_source STRING,

        attr2 STRING

      ) WITH (

       'connector' = 'jdbc',

       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',

       'table-name' = '<table>',

       'username' = '<username>',

       'password' = '<password>',

       'scan.fetch-size' = '500',

       'scan.auto-commit' = 'false')

    """)

 

    streamTableEnv.executeSql(

      """

      CREATE TABLE table2 (

        attr1_source STRING,

        attr2 STRING,

        attr3 DECIMAL,

        attr4 DATE

      ) WITH (

       'connector' = 'jdbc',

       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',

       'table-name' = '<table>',

       'username' = '<username>',

       'password' = '<password>',

       'scan.fetch-size' = '500',

       'scan.auto-commit' = 'false')

    """)

 

    val q1 = streamTableEnv.sqlQuery("""

      SELECT

        id_source AS id,

        attr1_source AS attr1,

        attr2

      FROM table1

    """)

    streamTableEnv.createTemporaryView("view1", q1)

 

    val q2 = streamTableEnv.sqlQuery(

      """

        SELECT

          a.attr1 AS attr1,

          attr2,

          attr3,

          attr4

        FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1)

    """)

    streamTableEnv.createTemporaryView("view2", q2)

 

    val q3 = streamTableEnv.sqlQuery("""

        SELECT

          w.attr1,

          p.attr3

        FROM view1 w

        LEFT JOIN LATERAL (

          SELECT

            attr1,

            attr3

          FROM (

            SELECT

              attr1,

              attr3,

              ROW_NUMBER() OVER (

                PARTITION BY attr1

                ORDER BY

                  attr4 DESC NULLS LAST,

                  w.attr2 = attr2 DESC NULLS LAST

              ) AS row_num

          FROM view2)

          WHERE row_num = 1) p

        ON (w.attr1 = p.attr1)

        """)

    streamTableEnv.createTemporaryView("view3", q3)

 

    val view3 = streamTableEnv.from("view3")

 

    view3

      .toRetractStream[Row]

      .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)

      .setParallelism(1)

 

    streamEnv.execute()

  }

}

 

Thanks,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Dylan,

 

Thanks for you feedback, if the planner encounters 

"unexpected correlate variable $cor2 in the plan" exception,

There's a high probability that FlinkDecorrelateProgram has some bugs 

or the query pattern is not supported now. I try to use JDBC Connector as the input tables,

but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc.

Thanks so much. 

 

Best,

Godfrey

 

 

 

Dylan Forciea <[hidden email]> 20201118日周三 下午10:09写道:

Godfrey,

 

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly.

 

Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows:

 

SELECT

  T.id,

  attr2,
  attr3,

  attr4

FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)

 

Where SplitStringToRows is defined as:

 

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))

class SplitStringToRows extends TableFunction[Row] {

 

  def eval(str: String, separator: String = ";"): Unit = {

    if (str != null) {

      str.split(separator).foreach(s => collect(Row.of(s.trim())))

    }

  }

}

 

Removing the lateral table bit in that first table made the original query plan work correctly.

 

I greatly appreciate your assistance!

 

Regards,

Dylan Forciea

 

From: godfrey he <[hidden email]>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Lateral join not finding correlate variable

 

Hi Dylan,

 

Could you provide which Flink version you find out the problem with? 

I test the above query on master, and I get the plan, no errors occur.

Here is my test case:

@Test
def testLateralJoin(): Unit = {
 
util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
 
val query =
   
"""
      |SELECT
      |  t1.id,
      |  t1.attr1,
      |  t2.attr2
      |FROM table1 t1
      |LEFT JOIN LATERAL (
      |  SELECT
      |    id,
      |    attr2
      |  FROM (
      |    SELECT
      |      id,
      |      attr2,
      |      ROW_NUMBER() OVER (
      |        PARTITION BY id
      |        ORDER BY
      |          attr3 DESC,
      |          t1.attr4 = attr4 DESC
      |      ) AS row_num
      |    FROM table2)
      |    WHERE row_num = 1) t2
      |ON t1.id = t2.id
      |"""
.stripMargin
 
util.verifyPlan(query)
}

Best,

Godfrey

 

Dylan Forciea <[hidden email]> 20201118日周三 上午7:44写道:

This may be due to not understanding  lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive.

 

I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of:

 

SELECT

  t1.id,

  t1.attr1,

  t2.attr2

FROM table1 t1

LEFT JOIN LATERAL (

  SELECT

    id,

    attr2

  FROM (

    SELECT

      id,

      attr2,

      ROW_NUMBER() OVER (

        PARTITION BY id
        ORDER BY

          attr3 DESC,

          t1.attr4 = attr4 DESC

      ) AS row_num

    FROM table2

    WHERE row_num = 1) t2

ON (t1.id = t2.id)

 

I am getting an error that looks like:

 

Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

     at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

     at scala.collection.Iterator.foreach(Iterator.scala:943)

     at scala.collection.Iterator.foreach$(Iterator.scala:943)

     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

     at scala.collection.IterableLike.foreach(IterableLike.scala:74)

     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

     at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)

     at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)

     at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

     at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)

     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

     at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

     at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)

     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)

     at io.oseberg.flink.well.ok.Job.main(Job.scala)

 

The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing.

 

Regards,

Dylan Forciea