Hello,
I am using Flink 1.11.2 as the execution engine for an alerting application. Our application builds atop Flink's SQL API to run streaming and batch jobs on a proprietary storage engine. We have a custom StreamTableSource implementation that connects to our storage engine. The connector currently implements the ProjectableTableSource interface. I now wish to extend the connector to push down filters to the source for improved performance. I have run into multiple issues in that effort - 1. Optimizer does not use both - ProjectableTableSource and FilterableTableSource - in a single query even if the source implements both interfaces. Each interface works correctly if implemented independently. 2. Implementations of FilterableTableSource fail inside the optimizer for a few TPC-DS queries in batch mode. Stacktrace: java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16, _UTF-16LE'Books'), =($16, _UTF-16LE'Sports'))) at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74) at org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68) at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126) at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45) at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130) at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) ... at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) ... at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) ... at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) ... at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213) Config: var settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment.create(settings); 3. And finally, filter expressions containing the current timestamp (& now) function are not resolved to constant values during predicate pushdown optimizer. Let's take the following SQL query for example - select count(*) from T0 where T0.C2 >= current_timestamp. Here, applyPredicate method of FilterableTableSource receives predicate as a CallExpression of form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected currentTimestamp to be resolved to a constant value that is identitcal across all usages of currentTimestamp in the query. Regards, Satyam |
Hi Satyam, Currently, the community is using the new table source/sink API and the `FilterableTableSource`, `ProjectableTableSource` have been deprecated. The interface `SupportsProjectionPushDown` and `SupportsFilterPushDown` are the new interfaces to push down the `projection` and `filter`. You can refer to this class for more implementation details[1]. Satyam Shekhar <[hidden email]> 于2021年1月12日周二 下午3:54写道:
|
In reply to this post by Satyam Shekhar
Hi, Shekhar
I didn’t your custom source implementation, but I think the two interface should works together, you can refer the ParquetTableSource and OrcTableSource implementation details.
IIRC, all TPC-DS tests can passed using Parquet, which means the two interface should works fine?
The CURRENT_TIMESTAMP behavior in Flink batch/stream sql have same behavior that calculate the function value for per record, not like hive , which calculate the function value once before the query execution and thus it can be treated as a constant value. To be honest, I tend to use a constant value for the function in batch,which is more intuitive. Could you help create an issue for this, we can have more discussion under the issue. Best, Leonard |
Hi, Shekhar
I didn’t see your custom source implementation, but I think the two interfaces should work together, you can refer the ParquetTableSource and OrcTableSource implementation details. You can also try ShengKai's proposal to use the new interface if you’re willing to upgrade to Flink 1.12, the source FilesystemTableSource implements SupportsProjectionPushDown and SupportsFilterPushDown interfaces.
IIRC, all TPC-DS tests can pass using Parquet, which means the two interfaces should work fine?
The CURRENT_TIMESTAMP & NOW & CURRENT_DATE & CURRENT_TIME functions in Flink batch/stream sql have same behavior that calculate the function value for per record, not like hive , which calculate the function value once before the query execution and thus it can be treated as a constant value. To be honest, I tend to use a constant value for the function in batch,which is more intuitive. Could you help create an issue for this, thus we can have more discussion under the issue. Best, Leonard |
Free forum by Nabble | Edit this page |