Hello,
We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):
You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].
I pushed a commit to my fork to reproduce the issue: https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f, unfortunately simplifying
the query seems to make the issue disappear.
Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause.
Thanks a lot for your help!
Benoit Hanotte
********************************* [1] SQL query *********************************
WITH displays AS (
SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display
),
clicks AS (
SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click
),
counts_2h AS (
SELECT
SUM(nb_clicks) / SUM(nb_displays) as ctr,
HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
FROM (
(SELECT * FROM displays)
UNION ALL
(SELECT * FROM clicks)
) t
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
),
counts_6h AS (
SELECT
SUM(nb_clicks) / SUM(nb_displays) as ctr,
HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
FROM (
(SELECT * FROM displays)
UNION ALL
(SELECT * FROM clicks)
) t
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR,
INTERVAL '6' HOUR)
)
SELECT
TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
MAX(ctr)
FROM (
(SELECT * FROM counts_6h)
UNION ALL
(SELECT * FROM counts_2h)
) t
GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)
********************* [2] Logical plan (before optimization) ***********************
LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
LogicalUnion(all=[true])
LogicalProject(ctr=[$0], timestamp=[$1])
LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1])
LogicalTableScan(table=[[my_catalog, my_db, click]])
LogicalProject(ctr=[$0], timestamp=[$1])
LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0])
LogicalTableScan(table=[[my_catalog, my_db, display]])
****************** [3] Resulting physical plan (after optimization) ********************
DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556
DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative
cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555
DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548
DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)],
select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546
DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544
StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545
StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553
DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp,
7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551
DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549
StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550
StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
|
Hi Benoit, thanks for reporting this issue. Since I'm not too familiar with the SQL component I've pulled in Timo and Jingsong who know much better what could be wrong than I do. Cheers, Till On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <[hidden email]> wrote:
|
Hello Till,
thanks for your reply!
I have been able to debug the issue and reported it in https://issues.apache.org/jira/browse/FLINK-15577.
It seems the old planner does not add the window specs to the Logical nodes' digests, leading the HepPlanner to consider the aggregations to be equivalent, when they are not because they use different time windows. I explained the issue more in details in the
ticket above, and have submitted a PR earlier today: https://github.com/apache/flink/pull/10854.
Benoit
From: Till Rohrmann <[hidden email]>
Sent: Tuesday, January 14, 2020 7:13 PM To: Benoit Hanotte <[hidden email]> Cc: [hidden email] <[hidden email]>; Jingsong Li <[hidden email]>; [hidden email] <[hidden email]> Subject: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results Hi Benoit,
thanks for reporting this issue. Since I'm not too familiar with the SQL component I've pulled in Timo and Jingsong who know much better what could be wrong than I do.
Cheers,
Till
On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <[hidden email]> wrote:
|
Great, thanks a lot for looking into the problem and fixing it. I assume that your PR will be merged very soon. Cheers, Till On Tue, Jan 14, 2020 at 7:18 PM Benoit Hanotte <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |