Hi
We're using multi sink in sql with view, the TestCase is """java @Test def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = { createLookupTable("LookupTableAsync1", new AsyncTableFunction1) util.addTable( """ |CREATE TEMPORARY VIEW v_vvv AS |SELECT * FROM MyTable AS T |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D |ON T.a = D.id |""".stripMargin) val stmtSet = util.tableEnv.createStatementSet() val appendSink1 = util.createRetractTableSink( Array("a", "b","id","name"), Array(INT, STRING, INT, STRING)) util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( "appendSink1", appendSink1) stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery(""" |SELECT a,b,id,name FROM v_vvv |WHERE age = 10 """.stripMargin)) stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery(""" |SELECT a,b,id,name FROM v_vvv |WHERE age = 30 """.stripMargin)) // util.verifyPlan(stmtSet) util.verifyExecPlan(stmtSet) } class AsyncTableFunction1 extends AsyncTableFunction[RowData] { def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = { } } """ the optimized exec plan is """ Calc(select=[a, b])(reuse_id=[1]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name]) +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name]) +- Reused(reference_id=[1]) LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name]) +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = 30)], select=[a, b, id, name]) +- Reused(reference_id=[1]) """ I have 2 questions 1. The lookup table function execute twice, which is very expensive 2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow) org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature: void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer) Is the optimizer wrong or I'm wrong ? Cooper.Luan |
should I post in Dev user list ? On Mon, 7 Jun 2021 at 18:56 Luan Cooper <[hidden email]> wrote:
|
Hi Cooper.Luan, > I have 2 questions 1. The lookup table function execute twice, which is very expensive 2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], which result to function signature mismatch (exception follows blow) If you need reuse LookupJoin like the above plan, you could use `createTemporaryView` to register LookupJoin result as a temp view, the code like the following: ''' @Test }''' > org.apache.flink.table.api.ValidationException: Could not find an implementation method 'eval' in class 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for function 'default_catalog.default_database.LookupTableAsync1, source: [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following signature: void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer)Would you please provide complete code to produce the exception, I didn't find the exception in my local environment. Best regards, JING ZHANG Luan Cooper <[hidden email]> 于2021年6月8日周二 上午10:42写道:
|
I attach the testcase in https://github.com/CooperLuan/flink/blob/lookup_optimize/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala#L334-L399 On Tue, Jun 8, 2021 at 1:50 PM JING ZHANG <[hidden email]> wrote:
you're right, the multi insert statement reuse lookup result as expected #testJoinTemporalTableWithMultisinkWithFilterPushDown2 show right result but testJoinTemporalTableWithMultisinkWithFilterPushDown1 is a bad case which result to lookup function signature mismatch diff between testJoinTemporalTableWithMultisinkWithFilterPushDown1 and testJoinTemporalTableWithMultisinkWithFilterPushDown2 testJoinTemporalTableWithMultisinkWithFilterPushDown1: statement set with 1 insert testJoinTemporalTableWithMultisinkWithFilterPushDown2: statement set with 2 insert
|
I should use patch instead of git repo sorry for the inconvenience On Tue, Jun 8, 2021 at 2:49 PM Luan Cooper <[hidden email]> wrote:
0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch (13K) Download Attachment |
Hi Cooper.Luan,
Sorry for the late response. I apply your patch locally. > 1. The lookup table function execute twice, which is very expensive As mentioned before, use `createTemporaryView` like `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, could reuse LookupJoin. Then why does this work? In Flink, there is a SubGraphBasedOptimizer which decomposes the RelNode DAG into multiple common sub-graphs, and each sub-graph is a tree, and can be optimized independently by Calcite Optimizer. In `testJoinTemporalTableWithMultisinkWithFilterPushDown2`, RelNodes would be decomposed into 3 sub-graph. The first sub-graph contains LookupJoin, the second sub-graph contains sink1, the third sub-graph contains sink2. Those three sub-graph would be optimized by Calcite planner dependently. > 2. Why SubGraphBasedOptimizer seem not work for `testJoinTemporalTableWithMultisinkWithFilterPushDown1` ? StreamCommonSubGraphBasedOptimizer would skip if only one sink block, because the whole RelNode DAG is a RelNode tree itself. So `LookupJoin` and 'Calc(Filter: age=10)' belong to the same sub-gprah. > 3. After push down to LookupJoin with lookup=[age=10, id=a], result to function signature mismatch (exception follows blow) After push down 'age=10', optimizer wants to find a method that matches two lookup keys, 'age, id' . We could improve the case to avoid throwing an exception: if not found the most match case, try to load all records from the right table that matches 'id=a', then filter output by 'age=10' later. I would add a JIRA for this improvement. However, it is better if you could add a method that matches void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.Integer), because it is equivalent with push down predicate to right table which would reduce transfer data. Best regards, JING ZHANG Luan Cooper <[hidden email]> 于2021年6月9日周三 下午2:17写道:
|
Thanks for your explanation, it helps I created a issue for this https://issues.apache.org/jira/browse/FLINK-22955 On Wed, Jun 9, 2021 at 3:50 PM JING ZHANG <[hidden email]> wrote:
|
Great, thanks for fixing the issue. I've attached this email to the JIRA. Luan Cooper <[hidden email]> 于2021年6月10日周四 下午2:28写道:
|
Hi Cooper Luan, As we discussed before, the root cause of exception is LookupJoin wants to find a method that matches two lookup keys, 'age, id' after the optimizer pushes down age=10 into LookupJoin. There is a possible improvement to avoid throwing an exception: if not found a matched method with two lookup keys, try to match the method with one lookup key. Dimension tables return all records that match 'id=a', then filter output by 'age=10' later Calc. But in this way, we need to know whether the only argument in the `eval` method represents 'age' or 'id'. The hint is missing in the current API. As discussed with Jark Wu , We think this improvement would involve refactoring `LookupTableSource`. We prefer let the user handle this case by adding a `eval` method that matches lookup keys. for example // first solution with one eval method with variable arguments length @SerialVersionUID(1L) class AsyncTableFunction1 extends AsyncTableFunction[RowData] { @varargs def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer*): Unit = { } } // second solution with multiple eval method, each method with fixed arguments length @SerialVersionUID(1L) class AsyncTableFunction1 extends AsyncTableFunction[RowData] { def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = { } def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b: Integer): Unit = { } } What do you think, Cooper Luan. JING ZHANG <[hidden email]> 于2021年6月10日周四 下午4:17写道:
|
Free forum by Nabble | Edit this page |