Add Logical filter on a query plan from Flink Table API

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Add Logical filter on a query plan from Flink Table API

Felipe Gutierrez
Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink. To start I am trying to create a HelloWorld which just add a logical filter on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink query if I use CalciteConfig cc = new CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build() [2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], source=[SensorTuples])

== Physical Execution Plan ==
....

Thanks,
Felipe

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Add Logical filter on a query plan from Flink Table API

Hequn Cheng
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't contain filter? Logically, this can be done through a rule. However, it sounds a little hack and you have to pay attention to semantic problems. One thing you have to notice is that you can't change the RowType when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you. You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter rule. If you remove your filter rule, there is nothing change for the plan. 

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink. To start I am trying to create a HelloWorld which just add a logical filter on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink query if I use CalciteConfig cc = new CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build() [2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], source=[SensorTuples])

== Physical Execution Plan ==
....

Thanks,
Felipe

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Add Logical filter on a query plan from Flink Table API

Felipe Gutierrez
Hi Hequn,

what is the relation of RelFactories [1] when I use it to create the INSTANCE of my rule? For example:

public static final MyFilterRule INSTANCE = new MyFilterRule(Filter.class, RelFactories.LOGICAL_BUILDER);

then I create a CalciteCOnfigBuilder using "new CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(), .addPhysicalOptRuleSet()".


On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <[hidden email]> wrote:
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't contain filter? Logically, this can be done through a rule. However, it sounds a little hack and you have to pay attention to semantic problems. One thing you have to notice is that you can't change the RowType when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you. You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter rule. If you remove your filter rule, there is nothing change for the plan. 

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink. To start I am trying to create a HelloWorld which just add a logical filter on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink query if I use CalciteConfig cc = new CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build() [2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], source=[SensorTuples])

== Physical Execution Plan ==
....

Thanks,
Felipe

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Add Logical filter on a query plan from Flink Table API

Hequn Cheng
Hi Felipe,

> what is the relation of RelFactories [1] when I use it to create the INSTANCE of my rule?
The `RelFactories.LOGICAL_BUILDER` can be used during the rule transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a `RelBuilderFactory` which contains a `create` method can be used to create a `RelBuilder`. The `RelBuilder`[1] is used to create relational expressions. 

Maybe we can also post the question in the Calcite mailing list. They may give more details. :-)

Best,
Hequn



On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <[hidden email]> wrote:
Hi Hequn,

what is the relation of RelFactories [1] when I use it to create the INSTANCE of my rule? For example:

public static final MyFilterRule INSTANCE = new MyFilterRule(Filter.class, RelFactories.LOGICAL_BUILDER);

then I create a CalciteCOnfigBuilder using "new CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(), .addPhysicalOptRuleSet()".


On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <[hidden email]> wrote:
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't contain filter? Logically, this can be done through a rule. However, it sounds a little hack and you have to pay attention to semantic problems. One thing you have to notice is that you can't change the RowType when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you. You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter rule. If you remove your filter rule, there is nothing change for the plan. 

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink. To start I am trying to create a HelloWorld which just add a logical filter on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink query if I use CalciteConfig cc = new CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build() [2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], source=[SensorTuples])

== Physical Execution Plan ==
....

Thanks,
Felipe

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Add Logical filter on a query plan from Flink Table API

Felipe Gutierrez
Hi Hequn,

it has been very hard to find even a very small tutorial of how to create my on rule in Calcite+Flink. What I did was copy a Calcite rule to my project and try to understand it. I am working with the FilterJoinRule [1] which is one rule the Flink is modifying it. In the end I want to create a rule for Join operators that allow me to choose between different implementations of Join algorithms (nested-loop, sort-merge, hash).

If you have any step-by-step on understanding the "RelOptRuleCall" parameter would be very nice =). but I guess I have to keep digging into the code...

Thanks anyway!

On Tue, Jul 9, 2019 at 2:10 PM Hequn Cheng <[hidden email]> wrote:
Hi Felipe,

> what is the relation of RelFactories [1] when I use it to create the INSTANCE of my rule?
The `RelFactories.LOGICAL_BUILDER` can be used during the rule transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a `RelBuilderFactory` which contains a `create` method can be used to create a `RelBuilder`. The `RelBuilder`[1] is used to create relational expressions. 

Maybe we can also post the question in the Calcite mailing list. They may give more details. :-)

Best,
Hequn



On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <[hidden email]> wrote:
Hi Hequn,

what is the relation of RelFactories [1] when I use it to create the INSTANCE of my rule? For example:

public static final MyFilterRule INSTANCE = new MyFilterRule(Filter.class, RelFactories.LOGICAL_BUILDER);

then I create a CalciteCOnfigBuilder using "new CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(), .addPhysicalOptRuleSet()".


On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <[hidden email]> wrote:
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't contain filter? Logically, this can be done through a rule. However, it sounds a little hack and you have to pay attention to semantic problems. One thing you have to notice is that you can't change the RowType when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you. You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter rule. If you remove your filter rule, there is nothing change for the plan. 

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink. To start I am trying to create a HelloWorld which just add a logical filter on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink query if I use CalciteConfig cc = new CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build() [2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], source=[SensorTuples])

== Physical Execution Plan ==
....

Thanks,
Felipe

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Add Logical filter on a query plan from Flink Table API

Hequn Cheng
Hi Felipe,

Yes, we are short of such tutorials. Probably you can take a look at the code of Flink-9713[1](checking the changelog in IDE is more convenient). The code shows how to create a logical node and how to use a rule to convert it into a FlinkLogicalRel and then convert into a DataStream RelNode. 
Hope this helps.


On Tue, Jul 9, 2019 at 9:37 PM Felipe Gutierrez <[hidden email]> wrote:
Hi Hequn,

it has been very hard to find even a very small tutorial of how to create my on rule in Calcite+Flink. What I did was copy a Calcite rule to my project and try to understand it. I am working with the FilterJoinRule [1] which is one rule the Flink is modifying it. In the end I want to create a rule for Join operators that allow me to choose between different implementations of Join algorithms (nested-loop, sort-merge, hash).

If you have any step-by-step on understanding the "RelOptRuleCall" parameter would be very nice =). but I guess I have to keep digging into the code...

Thanks anyway!

On Tue, Jul 9, 2019 at 2:10 PM Hequn Cheng <[hidden email]> wrote:
Hi Felipe,

> what is the relation of RelFactories [1] when I use it to create the INSTANCE of my rule?
The `RelFactories.LOGICAL_BUILDER` can be used during the rule transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a `RelBuilderFactory` which contains a `create` method can be used to create a `RelBuilder`. The `RelBuilder`[1] is used to create relational expressions. 

Maybe we can also post the question in the Calcite mailing list. They may give more details. :-)

Best,
Hequn



On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <[hidden email]> wrote:
Hi Hequn,

what is the relation of RelFactories [1] when I use it to create the INSTANCE of my rule? For example:

public static final MyFilterRule INSTANCE = new MyFilterRule(Filter.class, RelFactories.LOGICAL_BUILDER);

then I create a CalciteCOnfigBuilder using "new CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(), .addPhysicalOptRuleSet()".


On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng <[hidden email]> wrote:
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't contain filter? Logically, this can be done through a rule. However, it sounds a little hack and you have to pay attention to semantic problems. One thing you have to notice is that you can't change the RowType when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you. You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter rule. If you remove your filter rule, there is nothing change for the plan. 

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <[hidden email]> wrote:
Hi,

I am a newbie in Apache Calcite. I am trying to use it with Apache Flink. To start I am trying to create a HelloWorld which just add a logical filter on my query.
1 - I have my Flink app using Table API [1].
2 - I have created my Calcite filter rule which is applied to my FLink query if I use CalciteConfig cc = new CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build() [2];
3 - The debug thread only goes to my rule if there is a filter on my query.

I would like to create a logical filter if there is no filter set on the logical query. How should I implement it?
I see my LogicalFilter been created when I call "tableEnv.explain()" method. I suppose that I can add some logical filters on the plan.

== Abstract Syntax Tree ==
LogicalFilter(condition=[>=($6, 50)])
  LogicalTableScan(table=[[TicketsStation01Plat01]])

== Optimized Logical Plan ==
DataStreamCalc(select=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
  StreamTableSourceScan(table=[[TicketsStation01Plat01]], fields=[sensorId, sensorType, platformId, platformType, stationId, timestamp, value, trip, eventTime], source=[SensorTuples])

== Physical Execution Plan ==
....

Thanks,
Felipe

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez