Pushing Down Filters

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Pushing Down Filters

Satyam Shekhar
Hello,

I am using Flink 1.11.2 as the execution engine for an alerting application. Our application builds atop Flink's SQL API to run streaming and batch jobs on a proprietary storage engine. We have a custom StreamTableSource implementation that connects to our storage engine. The connector currently implements the ProjectableTableSource interface. I now wish to extend the connector to push down filters to the source for improved performance. I have run into multiple issues in that effort -

1. Optimizer does not use both - ProjectableTableSource and FilterableTableSource - in a single query even if the source implements both interfaces. Each interface works correctly if implemented independently.

2. Implementations of FilterableTableSource fail inside the optimizer for a few TPC-DS queries in batch mode.

Stacktrace:

java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16, _UTF-16LE'Books'), =($16, _UTF-16LE'Sports')))
   at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74)
   at org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68)
   at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
   at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
   at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
   at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
   at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
   ...
   at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
   ...
   at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
   ...
   at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at scala.collection.immutable.List.foreach(List.scala:392)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
   at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
   at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
   ...
   at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)


Config:
  var settings = EnvironmentSettings.newInstance()
                                  .useBlinkPlanner()
                                  .inBatchMode()
                                  .build();
  TableEnvironment.create(settings);

3. And finally, filter expressions containing the current timestamp (& now) function are not resolved to constant values during predicate pushdown optimizer. Let's take the following SQL query for example - select count(*) from T0 where T0.C2 >= current_timestamp.  HereapplyPredicate method of FilterableTableSource receives predicate as a CallExpression of  form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected currentTimestamp to be resolved to a constant value that is identitcal across all usages of currentTimestamp in the query. 

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Pushing Down Filters

Shengkai Fang
Hi Satyam,

Currently, the community is using the new table source/sink API and the `FilterableTableSource`, `ProjectableTableSource` have been deprecated. The interface `SupportsProjectionPushDown` and `SupportsFilterPushDown` are the new interfaces to push down the `projection` and `filter`.  

You can refer to this class for more implementation details[1].


Satyam Shekhar <[hidden email]> 于2021年1月12日周二 下午3:54写道:
Hello,

I am using Flink 1.11.2 as the execution engine for an alerting application. Our application builds atop Flink's SQL API to run streaming and batch jobs on a proprietary storage engine. We have a custom StreamTableSource implementation that connects to our storage engine. The connector currently implements the ProjectableTableSource interface. I now wish to extend the connector to push down filters to the source for improved performance. I have run into multiple issues in that effort -

1. Optimizer does not use both - ProjectableTableSource and FilterableTableSource - in a single query even if the source implements both interfaces. Each interface works correctly if implemented independently.

2. Implementations of FilterableTableSource fail inside the optimizer for a few TPC-DS queries in batch mode.

Stacktrace:

java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16, _UTF-16LE'Books'), =($16, _UTF-16LE'Sports')))
   at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74)
   at org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68)
   at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
   at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
   at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
   at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
   at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
   ...
   at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
   ...
   at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
   at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
   ...
   at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at scala.collection.immutable.List.foreach(List.scala:392)
   at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
   at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
   at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
   ...
   at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)


Config:
  var settings = EnvironmentSettings.newInstance()
                                  .useBlinkPlanner()
                                  .inBatchMode()
                                  .build();
  TableEnvironment.create(settings);

3. And finally, filter expressions containing the current timestamp (& now) function are not resolved to constant values during predicate pushdown optimizer. Let's take the following SQL query for example - select count(*) from T0 where T0.C2 >= current_timestamp.  HereapplyPredicate method of FilterableTableSource receives predicate as a CallExpression of  form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected currentTimestamp to be resolved to a constant value that is identitcal across all usages of currentTimestamp in the query. 

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Pushing Down Filters

Leonard Xu
In reply to this post by Satyam Shekhar
Hi, Shekhar

1. Optimizer does not use both - ProjectableTableSource and FilterableTableSource - in a single query even if the source implements both interfaces. Each interface works correctly if implemented independently.

I didn’t your custom source implementation, but I think the two interface should works together, you can refer the ParquetTableSource and OrcTableSource implementation details.


2. Implementations of FilterableTableSource fail inside the optimizer for a few TPC-DS queries in batch mode.

IIRC, all TPC-DS tests can passed using Parquet, which means the two interface should works fine?
 
3. And finally, filter expressions containing the current timestamp (& now) function are not resolved to constant values during predicate pushdown optimizer. Let's take the following SQL query for example - select count(*) from T0 where T0.C2 >= current_timestamp.  HereapplyPredicate method of FilterableTableSource receives predicate as a CallExpression of  form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected currentTimestamp to be resolved to a constant value that is identitcal across all usages of currentTimestamp in the query. 

The CURRENT_TIMESTAMP behavior in Flink batch/stream sql have same behavior that calculate the function value for per record, not like hive , which calculate the  function value once before the query execution and thus it can be treated as a constant value. To be honest, I tend to use a constant value for the function  in batch,which is more intuitive.  Could you help create an issue for this, we can have more discussion under the issue.

Best,
Leonard


Reply | Threaded
Open this post in threaded view
|

Re: Pushing Down Filters

Leonard Xu
Hi, Shekhar


1. Optimizer does not use both - ProjectableTableSource and FilterableTableSource - in a single query even if the source implements both interfaces. Each interface works correctly if implemented independently.

I didn’t see your custom source implementation, but I think the two interfaces should work together, you can refer the ParquetTableSource and OrcTableSource implementation details.  You can also try ShengKai's proposal to use the new interface if you’re willing to upgrade to Flink 1.12,  the source FilesystemTableSource implements SupportsProjectionPushDown  and SupportsFilterPushDown interfaces. 
 

2. Implementations of FilterableTableSource fail inside the optimizer for a few TPC-DS queries in batch mode.

IIRC, all TPC-DS tests can pass using Parquet, which means the two interfaces should work fine?
 
3. And finally, filter expressions containing the current timestamp (& now) function are not resolved to constant values during predicate pushdown optimizer. Let's take the following SQL query for example - select count(*) from T0 where T0.C2 >= current_timestamp.  HereapplyPredicate method of FilterableTableSource receives predicate as a CallExpression of  form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected currentTimestamp to be resolved to a constant value that is identitcal across all usages of currentTimestamp in the query. 

The CURRENT_TIMESTAMP & NOW & CURRENT_DATE & CURRENT_TIME functions in Flink batch/stream sql have same behavior that calculate the function value for per record, not like hive , which calculate the  function value once before the query execution and thus it can be treated as a constant value. To be honest, I tend to use a constant value for the function in batch,which is more intuitive.  Could you help create an issue for this, thus we can have more discussion under the issue.

Best,
Leonard