LocalWatermarkAssigner causes predicate pushdown to be skipped

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

LocalWatermarkAssigner causes predicate pushdown to be skipped

Yuval Itzchakov
Hi,
Flink 1.12.1, Blink Planner, Scala 2.12

I have the following logical plan:

 LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
+- LogicalProject(value=[$2], bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], hello_world=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], b=[EMPTY_MAP()])
   +- LogicalFilter(condition=[AND(=($4, _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
      +- LogicalWatermarkAssigner(rowtime=[bar], watermark=[$0])
         +- LogicalTableScan(table=[[default_catalog, default_database, foo]])

I have a custom source which creates a TableSchema based on an external table. When I create the schema, I push the watermark definition to the schema:

image.png

When the HepPlanner starts the optimization phase and reaches the "PushFilterInotTableSourceScanRule", it matches on the LogicalFilter in the definition. But then, since the RelOptRuleOperandChildPolicy is set to "SOME", it attempts to do a full match on the child nodes. Since the rule is defined as so:

image.png

The child filter fails since the immediate child of the filter is a "LocalWatermarkAssigner", and not the "LogicalTableScan" which is the grandchild:

image.png

Is this the desired behavior? Should I create the TableSchema without the row time attribute and use "SupportsWatermarkPushdown" to generate the watermark dynamically from the source record?

--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Yuval Itzchakov
Update: When I don't set the watermark explicitly on the TableSchema, `applyWatermarkStrategy` never gets called on my ScanTableSource, which does make sense. But now the question is what should be done? This feels a bit unintuitive. 

On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,
Flink 1.12.1, Blink Planner, Scala 2.12

I have the following logical plan:

 LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
+- LogicalProject(value=[$2], bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], hello_world=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], b=[EMPTY_MAP()])
   +- LogicalFilter(condition=[AND(=($4, _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
      +- LogicalWatermarkAssigner(rowtime=[bar], watermark=[$0])
         +- LogicalTableScan(table=[[default_catalog, default_database, foo]])

I have a custom source which creates a TableSchema based on an external table. When I create the schema, I push the watermark definition to the schema:

image.png

When the HepPlanner starts the optimization phase and reaches the "PushFilterInotTableSourceScanRule", it matches on the LogicalFilter in the definition. But then, since the RelOptRuleOperandChildPolicy is set to "SOME", it attempts to do a full match on the child nodes. Since the rule is defined as so:

image.png

The child filter fails since the immediate child of the filter is a "LocalWatermarkAssigner", and not the "LogicalTableScan" which is the grandchild:

image.png

Is this the desired behavior? Should I create the TableSchema without the row time attribute and use "SupportsWatermarkPushdown" to generate the watermark dynamically from the source record?

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Roman Khachatryan
Hi Yuval,

I'm not familiar with the Blink planner but probably Jark can help.

Regards,
Roman


On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov <[hidden email]> wrote:
Update: When I don't set the watermark explicitly on the TableSchema, `applyWatermarkStrategy` never gets called on my ScanTableSource, which does make sense. But now the question is what should be done? This feels a bit unintuitive. 

On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,
Flink 1.12.1, Blink Planner, Scala 2.12

I have the following logical plan:

 LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
+- LogicalProject(value=[$2], bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], hello_world=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], b=[EMPTY_MAP()])
   +- LogicalFilter(condition=[AND(=($4, _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
      +- LogicalWatermarkAssigner(rowtime=[bar], watermark=[$0])
         +- LogicalTableScan(table=[[default_catalog, default_database, foo]])

I have a custom source which creates a TableSchema based on an external table. When I create the schema, I push the watermark definition to the schema:

image.png

When the HepPlanner starts the optimization phase and reaches the "PushFilterInotTableSourceScanRule", it matches on the LogicalFilter in the definition. But then, since the RelOptRuleOperandChildPolicy is set to "SOME", it attempts to do a full match on the child nodes. Since the rule is defined as so:

image.png

The child filter fails since the immediate child of the filter is a "LocalWatermarkAssigner", and not the "LogicalTableScan" which is the grandchild:

image.png

Is this the desired behavior? Should I create the TableSchema without the row time attribute and use "SupportsWatermarkPushdown" to generate the watermark dynamically from the source record?

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Yuval Itzchakov
Hi Jark,
Would appreciate your help with this.

On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]> wrote:
Hi Yuval,

I'm not familiar with the Blink planner but probably Jark can help.

Regards,
Roman


On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov <[hidden email]> wrote:
Update: When I don't set the watermark explicitly on the TableSchema, `applyWatermarkStrategy` never gets called on my ScanTableSource, which does make sense. But now the question is what should be done? This feels a bit unintuitive. 

On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,
Flink 1.12.1, Blink Planner, Scala 2.12

I have the following logical plan:

 LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
+- LogicalProject(value=[$2], bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], hello_world=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], b=[EMPTY_MAP()])
   +- LogicalFilter(condition=[AND(=($4, _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
      +- LogicalWatermarkAssigner(rowtime=[bar], watermark=[$0])
         +- LogicalTableScan(table=[[default_catalog, default_database, foo]])

I have a custom source which creates a TableSchema based on an external table. When I create the schema, I push the watermark definition to the schema:

image.png

When the HepPlanner starts the optimization phase and reaches the "PushFilterInotTableSourceScanRule", it matches on the LogicalFilter in the definition. But then, since the RelOptRuleOperandChildPolicy is set to "SOME", it attempts to do a full match on the child nodes. Since the rule is defined as so:

image.png

The child filter fails since the immediate child of the filter is a "LocalWatermarkAssigner", and not the "LogicalTableScan" which is the grandchild:

image.png

Is this the desired behavior? Should I create the TableSchema without the row time attribute and use "SupportsWatermarkPushdown" to generate the watermark dynamically from the source record?

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Yuval Itzchakov
Bumping this up again, would appreciate any help if anyone is familiar with the blink planner.

Thanks,
Yuval.

On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]> wrote:
Hi Jark,
Would appreciate your help with this.

On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]> wrote:
Hi Yuval,

I'm not familiar with the Blink planner but probably Jark can help.

Regards,
Roman


On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov <[hidden email]> wrote:
Update: When I don't set the watermark explicitly on the TableSchema, `applyWatermarkStrategy` never gets called on my ScanTableSource, which does make sense. But now the question is what should be done? This feels a bit unintuitive. 

On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,
Flink 1.12.1, Blink Planner, Scala 2.12

I have the following logical plan:

 LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
+- LogicalProject(value=[$2], bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], hello_world=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], b=[EMPTY_MAP()])
   +- LogicalFilter(condition=[AND(=($4, _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
      +- LogicalWatermarkAssigner(rowtime=[bar], watermark=[$0])
         +- LogicalTableScan(table=[[default_catalog, default_database, foo]])

I have a custom source which creates a TableSchema based on an external table. When I create the schema, I push the watermark definition to the schema:

image.png

When the HepPlanner starts the optimization phase and reaches the "PushFilterInotTableSourceScanRule", it matches on the LogicalFilter in the definition. But then, since the RelOptRuleOperandChildPolicy is set to "SOME", it attempts to do a full match on the child nodes. Since the rule is defined as so:

image.png

The child filter fails since the immediate child of the filter is a "LocalWatermarkAssigner", and not the "LogicalTableScan" which is the grandchild:

image.png

Is this the desired behavior? Should I create the TableSchema without the row time attribute and use "SupportsWatermarkPushdown" to generate the watermark dynamically from the source record?

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Timo Walther
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:

> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>

Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Yuval Itzchakov
Hi Timo,
After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Jark Wu-3
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you assigned a watermark. 
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner will be pushed 
into TableSource, and then you can push Filter into source if source implement SupportsFilterPushdown. 

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <[hidden email]> wrote:
Hi Timo,
After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Yuval Itzchakov
Hi Jark,

Even after implementing both, I don't see the watermark being pushed to the tablesource in the logical plan and avoids predicate pushdown from running.

On Sun, Mar 7, 2021, 15:43 Jark Wu <[hidden email]> wrote:
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you assigned a watermark. 
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner will be pushed 
into TableSource, and then you can push Filter into source if source implement SupportsFilterPushdown. 

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <[hidden email]> wrote:
Hi Timo,
After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Shengkai Fang
Hi, Yuval, Jark, Timo. 

Currently the watermark push down happens in the logical rewrite phase but the filter push down happens in the local phase, which means the planner will first check the Filter push down and then check the watermark push down. 

I think we need a rule to transpose between the filter and watermark assigner or extend the filter push down rule to capture the structure that the watermark assigner is the parent of the table scan. 

Best, 
Shengkai

Yuval Itzchakov <[hidden email]> 于2021年3月8日周一 上午12:13写道:
Hi Jark,

Even after implementing both, I don't see the watermark being pushed to the tablesource in the logical plan and avoids predicate pushdown from running.

On Sun, Mar 7, 2021, 15:43 Jark Wu <[hidden email]> wrote:
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you assigned a watermark. 
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner will be pushed 
into TableSource, and then you can push Filter into source if source implement SupportsFilterPushdown. 

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <[hidden email]> wrote:
Hi Timo,
After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Yuval Itzchakov
Thank you Shenkai, 
That does explain what I'm seeing.

Jark / Shenkai - Is there any workaround to get Flink to work with push watermarks and predicate pushdown until this is resolved?

On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <[hidden email]> wrote:
Hi, Yuval, Jark, Timo. 

Currently the watermark push down happens in the logical rewrite phase but the filter push down happens in the local phase, which means the planner will first check the Filter push down and then check the watermark push down. 

I think we need a rule to transpose between the filter and watermark assigner or extend the filter push down rule to capture the structure that the watermark assigner is the parent of the table scan. 

Best, 
Shengkai

Yuval Itzchakov <[hidden email]> 于2021年3月8日周一 上午12:13写道:
Hi Jark,

Even after implementing both, I don't see the watermark being pushed to the tablesource in the logical plan and avoids predicate pushdown from running.

On Sun, Mar 7, 2021, 15:43 Jark Wu <[hidden email]> wrote:
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you assigned a watermark. 
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner will be pushed 
into TableSource, and then you can push Filter into source if source implement SupportsFilterPushdown. 

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <[hidden email]> wrote:
Hi Timo,
After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Shengkai Fang
Hi, Yuval. 

I have opened a ticket about this[1]. But I don't think we have any solution to solve. 

Do you have time to help us to solve this? I think it will take too much time. 


Yuval Itzchakov <[hidden email]> 于2021年3月8日周一 下午9:18写道:
Thank you Shenkai, 
That does explain what I'm seeing.

Jark / Shenkai - Is there any workaround to get Flink to work with push watermarks and predicate pushdown until this is resolved?

On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <[hidden email]> wrote:
Hi, Yuval, Jark, Timo. 

Currently the watermark push down happens in the logical rewrite phase but the filter push down happens in the local phase, which means the planner will first check the Filter push down and then check the watermark push down. 

I think we need a rule to transpose between the filter and watermark assigner or extend the filter push down rule to capture the structure that the watermark assigner is the parent of the table scan. 

Best, 
Shengkai

Yuval Itzchakov <[hidden email]> 于2021年3月8日周一 上午12:13写道:
Hi Jark,

Even after implementing both, I don't see the watermark being pushed to the tablesource in the logical plan and avoids predicate pushdown from running.

On Sun, Mar 7, 2021, 15:43 Jark Wu <[hidden email]> wrote:
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you assigned a watermark. 
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner will be pushed 
into TableSource, and then you can push Filter into source if source implement SupportsFilterPushdown. 

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <[hidden email]> wrote:
Hi Timo,
After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

Shengkai Fang
Sorry for the typo...

I mean it will not take too much time. 

Best,
Shengkai

Shengkai Fang <[hidden email]> 于2021年3月9日周二 上午10:25写道:
Hi, Yuval. 

I have opened a ticket about this[1]. But I don't think we have any solution to solve. 

Do you have time to help us to solve this? I think it will take too much time. 


Yuval Itzchakov <[hidden email]> 于2021年3月8日周一 下午9:18写道:
Thank you Shenkai, 
That does explain what I'm seeing.

Jark / Shenkai - Is there any workaround to get Flink to work with push watermarks and predicate pushdown until this is resolved?

On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <[hidden email]> wrote:
Hi, Yuval, Jark, Timo. 

Currently the watermark push down happens in the logical rewrite phase but the filter push down happens in the local phase, which means the planner will first check the Filter push down and then check the watermark push down. 

I think we need a rule to transpose between the filter and watermark assigner or extend the filter push down rule to capture the structure that the watermark assigner is the parent of the table scan. 

Best, 
Shengkai

Yuval Itzchakov <[hidden email]> 于2021年3月8日周一 上午12:13写道:
Hi Jark,

Even after implementing both, I don't see the watermark being pushed to the tablesource in the logical plan and avoids predicate pushdown from running.

On Sun, Mar 7, 2021, 15:43 Jark Wu <[hidden email]> wrote:
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you assigned a watermark. 
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner will be pushed 
into TableSource, and then you can push Filter into source if source implement SupportsFilterPushdown. 

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <[hidden email]> wrote:
Hi Timo,
After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the
cracks.

If I understand you correctly, could would like to implement a table
source that implements both `SupportsWatermarkPushDown` and
`SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not
very compatible. Filtering would also mean that records (from which
watermarks could be generated) are skipped. If the filter is very
strict, we would not generate any new watermarks and the pipeline would
stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are
required. Otherwise the watermarks are generated in a subsequent
operator after the source. So you can still use rowtime without
implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
> Bumping this up again, would appreciate any help if anyone is familiar
> with the blink planner.
>
> Thanks,
> Yuval.
>
> On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Jark,
>     Would appreciate your help with this.
>
>     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         I'm not familiar with the Blink planner but probably Jark can help.
>
>         Regards,
>         Roman
>
>
>         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Update: When I don't set the watermark explicitly on the
>             TableSchema, `applyWatermarkStrategy` never gets called on
>             my ScanTableSource, which does make sense. But now the
>             question is what should be done? This feels a bit unintuitive.
>
>             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 Hi,
>                 Flink 1.12.1, Blink Planner, Scala 2.12
>
>                 I have the following logical plan:
>
>                   LogicalSink(table=[default_catalog.default_database.table], fields=[bar, baz, hello_world, a, b])
>                 +- LogicalProject(value=[$2],
>                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>                 "UTF-16LE"], b=[EMPTY_MAP()])
>                     +- LogicalFilter(condition=[AND(=($4,
>                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>                        +- LogicalWatermarkAssigner(rowtime=[bar],
>                 watermark=[$0])
>                           +- LogicalTableScan(table=[[default_catalog,
>                 default_database, foo]])
>
>                 I have a custom source which creates a TableSchema based
>                 on an external table. When I create the schema, I push
>                 the watermark definition to the schema:
>
>                 image.png
>
>                 When the HepPlanner starts the optimization phase and
>                 reaches the "PushFilterInotTableSourceScanRule", it
>                 matches on the LogicalFilter in the definition. But
>                 then, since the RelOptRuleOperandChildPolicy is set to
>                 "SOME", it attempts to do a full match on the child
>                 nodes. Since the rule is defined as so:
>
>                 image.png
>
>                 The child filter fails since the immediate child of the
>                 filter is a "LocalWatermarkAssigner", and not the
>                 "LogicalTableScan" which is the grandchild:
>
>                 image.png
>
>                 Is this the desired behavior? Should I create the
>                 TableSchema without the row time attribute and use
>                 "SupportsWatermarkPushdown" to generate the watermark
>                 dynamically from the source record?
>
>                 --
>                 Best Regards,
>                 Yuval Itzchakov.
>
>
>
>             --
>             Best Regards,
>             Yuval Itzchakov.
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.