Hello,
We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):
- We open 2 event streams ("clicks" and "displays")
- We compute the click rate (ctr) over 2 hours and 6 hours sliding windows.
- We then union to output one row per hour with the max value between the values computed over 2 and 6hrs.
You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].
Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause.
Thanks a lot for your help!
Benoit Hanotte
********************************* [1] SQL query *********************************
WITH displays AS (
SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display
),
clicks AS (
SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click
),
counts_2h AS (
SELECT
SUM(nb_clicks) / SUM(nb_displays) as ctr,
HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
FROM (
(SELECT * FROM displays)
UNION ALL
(SELECT * FROM clicks)
) t
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
),
counts_6h AS (
SELECT
SUM(nb_clicks) / SUM(nb_displays) as ctr,
HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
FROM (
(SELECT * FROM displays)
UNION ALL
(SELECT * FROM clicks)
) t
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR,
INTERVAL '6' HOUR)
)
SELECT
TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
MAX(ctr)
FROM (
(SELECT * FROM counts_6h)
UNION ALL
(SELECT * FROM counts_2h)
) t
GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)
********************* [2] Logical plan (before optimization) ***********************
LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
LogicalUnion(all=[true])
LogicalProject(ctr=[$0], timestamp=[$1])
LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1])
LogicalTableScan(table=[[my_catalog, my_db, click]])
LogicalProject(ctr=[$0], timestamp=[$1])
LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0])
LogicalTableScan(table=[[my_catalog, my_db, display]])
****************** [3] Resulting physical plan (after optimization) ********************
DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556
DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative
cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555
DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548
DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)],
select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546
DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544
StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545
StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553
DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp,
7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$)
AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551
DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549
StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550
StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542