sql multisink explain result is more expensive than expected

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

sql multisink explain result is more expensive than expected

gsavl
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

gsavl
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

JING ZHANG
Hi Cooper.Luan,
I have 2 questions
  1. The lookup table function execute twice, which is very expensive
  2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

image.png

If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following:
'''
  @Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
// util.addTable(
// """
// |CREATE TEMPORARY VIEW v_vvv AS
// |SELECT * FROM MyTable AS T
// |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
// |ON T.a = D.id
// |""".stripMargin)
val table = util.tableEnv.sqlQuery(
"""
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)

util.tableEnv.createTemporaryView("v_vvv", table)
val stmtSet = util.tableEnv.createStatementSet()

val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 10
""".stripMargin))

stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 30
""".stripMargin))

// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
  }'''

> org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Would you please provide complete code to produce the exception, I didn't find the exception in my local environment.

Best regards,
JING ZHANG


Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

gsavl

On Tue, Jun 8, 2021 at 1:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
I have 2 questions
  1. The lookup table function execute twice, which is very expensive
  2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

image.png

If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following:
'''
you're right, the multi insert statement reuse lookup result as expected
#testJoinTemporalTableWithMultisinkWithFilterPushDown2 show right result
image.png

but testJoinTemporalTableWithMultisinkWithFilterPushDown1 is a bad case
image.png
which result to lookup function signature mismatch

diff between testJoinTemporalTableWithMultisinkWithFilterPushDown1 and testJoinTemporalTableWithMultisinkWithFilterPushDown2
testJoinTemporalTableWithMultisinkWithFilterPushDown1: statement set with 1 insert
testJoinTemporalTableWithMultisinkWithFilterPushDown2: statement set with 2 insert
  @Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
// util.addTable(
// """
// |CREATE TEMPORARY VIEW v_vvv AS
// |SELECT * FROM MyTable AS T
// |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
// |ON T.a = D.id
// |""".stripMargin)
val table = util.tableEnv.sqlQuery(
"""
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)

util.tableEnv.createTemporaryView("v_vvv", table)
val stmtSet = util.tableEnv.createStatementSet()

val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 10
""".stripMargin))

stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 30
""".stripMargin))

// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
  }'''

> org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Would you please provide complete code to produce the exception, I didn't find the exception in my local environment.

Best regards,
JING ZHANG


Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

gsavl
I should use patch instead of git repo
sorry for the inconvenience


On Tue, Jun 8, 2021 at 2:49 PM Luan Cooper <[hidden email]> wrote:

On Tue, Jun 8, 2021 at 1:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
I have 2 questions
  1. The lookup table function execute twice, which is very expensive
  2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

image.png

If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following:
'''
you're right, the multi insert statement reuse lookup result as expected
#testJoinTemporalTableWithMultisinkWithFilterPushDown2 show right result
image.png

but testJoinTemporalTableWithMultisinkWithFilterPushDown1 is a bad case
image.png
which result to lookup function signature mismatch

diff between testJoinTemporalTableWithMultisinkWithFilterPushDown1 and testJoinTemporalTableWithMultisinkWithFilterPushDown2
testJoinTemporalTableWithMultisinkWithFilterPushDown1: statement set with 1 insert
testJoinTemporalTableWithMultisinkWithFilterPushDown2: statement set with 2 insert
  @Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
// util.addTable(
// """
// |CREATE TEMPORARY VIEW v_vvv AS
// |SELECT * FROM MyTable AS T
// |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
// |ON T.a = D.id
// |""".stripMargin)
val table = util.tableEnv.sqlQuery(
"""
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)

util.tableEnv.createTemporaryView("v_vvv", table)
val stmtSet = util.tableEnv.createStatementSet()

val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 10
""".stripMargin))

stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 30
""".stripMargin))

// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
  }'''

> org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Would you please provide complete code to produce the exception, I didn't find the exception in my local environment.

Best regards,
JING ZHANG


Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan


0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch (13K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

JING ZHANG
Hi Cooper.Luan,
Sorry for the late response. I apply your patch locally.
> 1. The lookup table function execute twice, which is very expensive
As mentioned before, use `createTemporaryView` like `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, could reuse LookupJoin.

Then why does this work?
In Flink, there is a SubGraphBasedOptimizer which decomposes the RelNode DAG into multiple common sub-graphs, and each sub-graph is a tree, and can be optimized independently by Calcite Optimizer.
In `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, RelNodes would be decomposed into 3 sub-graph. The first sub-graph contains LookupJoin, the second sub-graph contains sink1, the third sub-graph contains sink2. Those three sub-graph would be optimized by Calcite planner dependently.

> 2. Why SubGraphBasedOptimizer seem not work for `testJoinTemporalTableWithMultisinkWithFilterPushDown1` ?
StreamCommonSubGraphBasedOptimizer would skip if only one sink block, because the whole RelNode DAG is a RelNode tree itself.
So `LookupJoin` and 'Calc(Filter: age=10)' belong to the same sub-gprah. 

> 3. After push down to LookupJoin with lookup=[age=10, id=a], result to function signature mismatch (exception follows blow)
After push down 'age=10', optimizer wants to find a method that matches two lookup keys, 'age, id' . 
We could  improve the case to avoid throwing an exception: if not found the most match case, try to load all records from the right table that matches 'id=a', then filter output by 'age=10' later. I would add a JIRA for this improvement.
However, it is better if you could add a method that matches void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer), because it is equivalent with push down predicate to right table which would reduce transfer data.

Best regards,
JING ZHANG

Luan Cooper <[hidden email]> 于2021年6月9日周三 下午2:17写道:
I should use patch instead of git repo
sorry for the inconvenience


On Tue, Jun 8, 2021 at 2:49 PM Luan Cooper <[hidden email]> wrote:

On Tue, Jun 8, 2021 at 1:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
I have 2 questions
  1. The lookup table function execute twice, which is very expensive
  2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

image.png

If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following:
'''
you're right, the multi insert statement reuse lookup result as expected
#testJoinTemporalTableWithMultisinkWithFilterPushDown2 show right result
image.png

but testJoinTemporalTableWithMultisinkWithFilterPushDown1 is a bad case
image.png
which result to lookup function signature mismatch

diff between testJoinTemporalTableWithMultisinkWithFilterPushDown1 and testJoinTemporalTableWithMultisinkWithFilterPushDown2
testJoinTemporalTableWithMultisinkWithFilterPushDown1: statement set with 1 insert
testJoinTemporalTableWithMultisinkWithFilterPushDown2: statement set with 2 insert
  @Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
// util.addTable(
// """
// |CREATE TEMPORARY VIEW v_vvv AS
// |SELECT * FROM MyTable AS T
// |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
// |ON T.a = D.id
// |""".stripMargin)
val table = util.tableEnv.sqlQuery(
"""
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)

util.tableEnv.createTemporaryView("v_vvv", table)
val stmtSet = util.tableEnv.createStatementSet()

val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 10
""".stripMargin))

stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 30
""".stripMargin))

// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
  }'''

> org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Would you please provide complete code to produce the exception, I didn't find the exception in my local environment.

Best regards,
JING ZHANG


Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

gsavl
Thanks for your explanation, it helps


On Wed, Jun 9, 2021 at 3:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
Sorry for the late response. I apply your patch locally.
> 1. The lookup table function execute twice, which is very expensive
As mentioned before, use `createTemporaryView` like `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, could reuse LookupJoin.

Then why does this work?
In Flink, there is a SubGraphBasedOptimizer which decomposes the RelNode DAG into multiple common sub-graphs, and each sub-graph is a tree, and can be optimized independently by Calcite Optimizer.
In `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, RelNodes would be decomposed into 3 sub-graph. The first sub-graph contains LookupJoin, the second sub-graph contains sink1, the third sub-graph contains sink2. Those three sub-graph would be optimized by Calcite planner dependently.

> 2. Why SubGraphBasedOptimizer seem not work for `testJoinTemporalTableWithMultisinkWithFilterPushDown1` ?
StreamCommonSubGraphBasedOptimizer would skip if only one sink block, because the whole RelNode DAG is a RelNode tree itself.
So `LookupJoin` and 'Calc(Filter: age=10)' belong to the same sub-gprah. 

> 3. After push down to LookupJoin with lookup=[age=10, id=a], result to function signature mismatch (exception follows blow)
After push down 'age=10', optimizer wants to find a method that matches two lookup keys, 'age, id' . 
We could  improve the case to avoid throwing an exception: if not found the most match case, try to load all records from the right table that matches 'id=a', then filter output by 'age=10' later. I would add a JIRA for this improvement.
However, it is better if you could add a method that matches void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer), because it is equivalent with push down predicate to right table which would reduce transfer data.

Best regards,
JING ZHANG

Luan Cooper <[hidden email]> 于2021年6月9日周三 下午2:17写道:
I should use patch instead of git repo
sorry for the inconvenience


On Tue, Jun 8, 2021 at 2:49 PM Luan Cooper <[hidden email]> wrote:

On Tue, Jun 8, 2021 at 1:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
I have 2 questions
  1. The lookup table function execute twice, which is very expensive
  2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

image.png

If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following:
'''
you're right, the multi insert statement reuse lookup result as expected
#testJoinTemporalTableWithMultisinkWithFilterPushDown2 show right result
image.png

but testJoinTemporalTableWithMultisinkWithFilterPushDown1 is a bad case
image.png
which result to lookup function signature mismatch

diff between testJoinTemporalTableWithMultisinkWithFilterPushDown1 and testJoinTemporalTableWithMultisinkWithFilterPushDown2
testJoinTemporalTableWithMultisinkWithFilterPushDown1: statement set with 1 insert
testJoinTemporalTableWithMultisinkWithFilterPushDown2: statement set with 2 insert
  @Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
// util.addTable(
// """
// |CREATE TEMPORARY VIEW v_vvv AS
// |SELECT * FROM MyTable AS T
// |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
// |ON T.a = D.id
// |""".stripMargin)
val table = util.tableEnv.sqlQuery(
"""
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)

util.tableEnv.createTemporaryView("v_vvv", table)
val stmtSet = util.tableEnv.createStatementSet()

val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 10
""".stripMargin))

stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 30
""".stripMargin))

// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
  }'''

> org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Would you please provide complete code to produce the exception, I didn't find the exception in my local environment.

Best regards,
JING ZHANG


Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

JING ZHANG
Great, thanks for fixing the issue. I've attached this email to the JIRA.

Luan Cooper <[hidden email]> 于2021年6月10日周四 下午2:28写道:
Thanks for your explanation, it helps


On Wed, Jun 9, 2021 at 3:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
Sorry for the late response. I apply your patch locally.
> 1. The lookup table function execute twice, which is very expensive
As mentioned before, use `createTemporaryView` like `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, could reuse LookupJoin.

Then why does this work?
In Flink, there is a SubGraphBasedOptimizer which decomposes the RelNode DAG into multiple common sub-graphs, and each sub-graph is a tree, and can be optimized independently by Calcite Optimizer.
In `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, RelNodes would be decomposed into 3 sub-graph. The first sub-graph contains LookupJoin, the second sub-graph contains sink1, the third sub-graph contains sink2. Those three sub-graph would be optimized by Calcite planner dependently.

> 2. Why SubGraphBasedOptimizer seem not work for `testJoinTemporalTableWithMultisinkWithFilterPushDown1` ?
StreamCommonSubGraphBasedOptimizer would skip if only one sink block, because the whole RelNode DAG is a RelNode tree itself.
So `LookupJoin` and 'Calc(Filter: age=10)' belong to the same sub-gprah. 

> 3. After push down to LookupJoin with lookup=[age=10, id=a], result to function signature mismatch (exception follows blow)
After push down 'age=10', optimizer wants to find a method that matches two lookup keys, 'age, id' . 
We could  improve the case to avoid throwing an exception: if not found the most match case, try to load all records from the right table that matches 'id=a', then filter output by 'age=10' later. I would add a JIRA for this improvement.
However, it is better if you could add a method that matches void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer), because it is equivalent with push down predicate to right table which would reduce transfer data.

Best regards,
JING ZHANG

Luan Cooper <[hidden email]> 于2021年6月9日周三 下午2:17写道:
I should use patch instead of git repo
sorry for the inconvenience


On Tue, Jun 8, 2021 at 2:49 PM Luan Cooper <[hidden email]> wrote:

On Tue, Jun 8, 2021 at 1:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
I have 2 questions
  1. The lookup table function execute twice, which is very expensive
  2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

image.png

If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following:
'''
you're right, the multi insert statement reuse lookup result as expected
#testJoinTemporalTableWithMultisinkWithFilterPushDown2 show right result
image.png

but testJoinTemporalTableWithMultisinkWithFilterPushDown1 is a bad case
image.png
which result to lookup function signature mismatch

diff between testJoinTemporalTableWithMultisinkWithFilterPushDown1 and testJoinTemporalTableWithMultisinkWithFilterPushDown2
testJoinTemporalTableWithMultisinkWithFilterPushDown1: statement set with 1 insert
testJoinTemporalTableWithMultisinkWithFilterPushDown2: statement set with 2 insert
  @Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
// util.addTable(
// """
// |CREATE TEMPORARY VIEW v_vvv AS
// |SELECT * FROM MyTable AS T
// |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
// |ON T.a = D.id
// |""".stripMargin)
val table = util.tableEnv.sqlQuery(
"""
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)

util.tableEnv.createTemporaryView("v_vvv", table)
val stmtSet = util.tableEnv.createStatementSet()

val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 10
""".stripMargin))

stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 30
""".stripMargin))

// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
  }'''

> org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Would you please provide complete code to produce the exception, I didn't find the exception in my local environment.

Best regards,
JING ZHANG


Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply | Threaded
Open this post in threaded view
|

Re: sql multisink explain result is more expensive than expected

JING ZHANG

Hi Cooper Luan,

As we discussed before, the root cause of exception is LookupJoin wants to find a method that matches two lookup keys, 'age, id' after the optimizer pushes down age=10 into LookupJoin.

There is a possible improvement to avoid throwing an exception: if not found a matched method with two lookup keys, try to match the method with one lookup key. Dimension tables return all records that match 'id=a', then filter output by 'age=10' later Calc.

But in this way, we need to know whether the only argument in the `eval` method represents 'age' or 'id'. The hint is missing in the current API. As discussed with Jark Wu , We think this improvement would involve refactoring `LookupTableSource`.

We prefer let the user handle this case by adding a `eval` method that matches lookup keys. for example 

// first solution with one eval method with variable arguments length
@SerialVersionUID(1L)
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {

  @varargs
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer*): Unit = {
  }
}

// second solution with multiple eval method, each method with fixed arguments length
@SerialVersionUID(1L)
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {

  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }

  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b: Integer): Unit = {
  }
}

What do you think, Cooper Luan.


JING ZHANG <[hidden email]> 于2021年6月10日周四 下午4:17写道:
Great, thanks for fixing the issue. I've attached this email to the JIRA.

Luan Cooper <[hidden email]> 于2021年6月10日周四 下午2:28写道:
Thanks for your explanation, it helps


On Wed, Jun 9, 2021 at 3:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
Sorry for the late response. I apply your patch locally.
> 1. The lookup table function execute twice, which is very expensive
As mentioned before, use `createTemporaryView` like `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, could reuse LookupJoin.

Then why does this work?
In Flink, there is a SubGraphBasedOptimizer which decomposes the RelNode DAG into multiple common sub-graphs, and each sub-graph is a tree, and can be optimized independently by Calcite Optimizer.
In `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, RelNodes would be decomposed into 3 sub-graph. The first sub-graph contains LookupJoin, the second sub-graph contains sink1, the third sub-graph contains sink2. Those three sub-graph would be optimized by Calcite planner dependently.

> 2. Why SubGraphBasedOptimizer seem not work for `testJoinTemporalTableWithMultisinkWithFilterPushDown1` ?
StreamCommonSubGraphBasedOptimizer would skip if only one sink block, because the whole RelNode DAG is a RelNode tree itself.
So `LookupJoin` and 'Calc(Filter: age=10)' belong to the same sub-gprah. 

> 3. After push down to LookupJoin with lookup=[age=10, id=a], result to function signature mismatch (exception follows blow)
After push down 'age=10', optimizer wants to find a method that matches two lookup keys, 'age, id' . 
We could  improve the case to avoid throwing an exception: if not found the most match case, try to load all records from the right table that matches 'id=a', then filter output by 'age=10' later. I would add a JIRA for this improvement.
However, it is better if you could add a method that matches void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer), because it is equivalent with push down predicate to right table which would reduce transfer data.

Best regards,
JING ZHANG

Luan Cooper <[hidden email]> 于2021年6月9日周三 下午2:17写道:
I should use patch instead of git repo
sorry for the inconvenience


On Tue, Jun 8, 2021 at 2:49 PM Luan Cooper <[hidden email]> wrote:

On Tue, Jun 8, 2021 at 1:50 PM JING ZHANG <[hidden email]> wrote:
Hi Cooper.Luan,
I have 2 questions
  1. The lookup table function execute twice, which is very expensive
  2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

image.png

If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following:
'''
you're right, the multi insert statement reuse lookup result as expected
#testJoinTemporalTableWithMultisinkWithFilterPushDown2 show right result
image.png

but testJoinTemporalTableWithMultisinkWithFilterPushDown1 is a bad case
image.png
which result to lookup function signature mismatch

diff between testJoinTemporalTableWithMultisinkWithFilterPushDown1 and testJoinTemporalTableWithMultisinkWithFilterPushDown2
testJoinTemporalTableWithMultisinkWithFilterPushDown1: statement set with 1 insert
testJoinTemporalTableWithMultisinkWithFilterPushDown2: statement set with 2 insert
  @Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
// util.addTable(
// """
// |CREATE TEMPORARY VIEW v_vvv AS
// |SELECT * FROM MyTable AS T
// |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
// |ON T.a = D.id
// |""".stripMargin)
val table = util.tableEnv.sqlQuery(
"""
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)

util.tableEnv.createTemporaryView("v_vvv", table)
val stmtSet = util.tableEnv.createStatementSet()

val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 10
""".stripMargin))

stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT a,b,id,name FROM v_vvv
|WHERE age = 30
""".stripMargin))

// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
  }'''

> org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Would you please provide complete code to produce the exception, I didn't find the exception in my local environment.

Best regards,
JING ZHANG


Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))
    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT a,b,id,name FROM v_vvv
                                                              |WHERE age = 30
                                                            """.stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

I have 2 questions
1. The lookup table function execute twice, which is very expensive
2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow)

org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan