Issue with FilterableTableSource and the logical optimizer rules

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue with FilterableTableSource and the logical optimizer rules

Itamar Ravid
Hi, I’m facing a strange issue with Flink 1.8.1. I’ve implemented a StreamTableSource that implements FilterableTableSource and ProjectableTableSource. However, I’m seeing that during the logical plan optimization (TableEnvironment.scala:288), the applyPredicates method is called but the resulting plan does NOT contain the source with the filter pushed.

It appears that the problem is in the VolcanoPlanner.findBestExp method; when it reaches “root.buildCheapestPlan”, the resulting RelNode does not contain the filtered source.

Additionally, I added a breakpoint in FlinkLogicalTableSourceScan#computeSelfCost, and the tableSource never has the predicates pushed. I verified that in the PushFilterIntoTableSourceScanRule, the resulting source always has the predicates pushed.

Amusingly, this issue causes queries like “SELECT a FROM src WHERE a = 123” to be rewritten to “SELECT 123 FROM src” :-)

Does anyone have any advice on debugging/working around this without disabling predicate pushdown on the source?
Reply | Threaded
Open this post in threaded view
|

Re: Issue with FilterableTableSource and the logical optimizer rules

Rong Rong
Hi Itamar,

The problem you described sounds similar to this ticket[1]. 
Can you try to see if following the solution listed resolves your issue?

--
Rong


On Mon, Aug 19, 2019 at 8:56 AM Itamar Ravid <[hidden email]> wrote:
Hi, I’m facing a strange issue with Flink 1.8.1. I’ve implemented a StreamTableSource that implements FilterableTableSource and ProjectableTableSource. However, I’m seeing that during the logical plan optimization (TableEnvironment.scala:288), the applyPredicates method is called but the resulting plan does NOT contain the source with the filter pushed.

It appears that the problem is in the VolcanoPlanner.findBestExp method; when it reaches “root.buildCheapestPlan”, the resulting RelNode does not contain the filtered source.

Additionally, I added a breakpoint in FlinkLogicalTableSourceScan#computeSelfCost, and the tableSource never has the predicates pushed. I verified that in the PushFilterIntoTableSourceScanRule, the resulting source always has the predicates pushed.

Amusingly, this issue causes queries like “SELECT a FROM src WHERE a = 123” to be rewritten to “SELECT 123 FROM src” :-)

Does anyone have any advice on debugging/working around this without disabling predicate pushdown on the source?
Reply | Threaded
Open this post in threaded view
|

Re: Issue with FilterableTableSource and the logical optimizer rules

Itamar Ravid
Hi Rong, thanks for the hint - that solved the issue.
On 20 Aug 2019, 0:06 +0300, Rong Rong <[hidden email]>, wrote:
Hi Itamar,

The problem you described sounds similar to this ticket[1]. 
Can you try to see if following the solution listed resolves your issue?

--
Rong


On Mon, Aug 19, 2019 at 8:56 AM Itamar Ravid <[hidden email]> wrote:
Hi, I’m facing a strange issue with Flink 1.8.1. I’ve implemented a StreamTableSource that implements FilterableTableSource and ProjectableTableSource. However, I’m seeing that during the logical plan optimization (TableEnvironment.scala:288), the applyPredicates method is called but the resulting plan does NOT contain the source with the filter pushed.

It appears that the problem is in the VolcanoPlanner.findBestExp method; when it reaches “root.buildCheapestPlan”, the resulting RelNode does not contain the filtered source.

Additionally, I added a breakpoint in FlinkLogicalTableSourceScan#computeSelfCost, and the tableSource never has the predicates pushed. I verified that in the PushFilterIntoTableSourceScanRule, the resulting source always has the predicates pushed.

Amusingly, this issue causes queries like “SELECT a FROM src WHERE a = 123” to be rewritten to “SELECT 123 FROM src” :-)

Does anyone have any advice on debugging/working around this without disabling predicate pushdown on the source?