Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

Benoit Hanotte
Hello,

We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):
  1. We open 2 event streams ("clicks" and "displays")
  2. We compute the click rate (ctr) over 2 hours and 6 hours sliding windows. 
  3. We then union to output one row per hour with the max value between the values computed over 2 and 6hrs.
You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f, unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause.

Thanks a lot for your help!

Benoit Hanotte

********************************* [1] SQL query *********************************

    WITH displays AS (
        SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display
    ),
   
    clicks AS (
        SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click
    ),
   
    counts_2h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
    ),
   
    counts_6h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
    )
   
    SELECT
        TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
        MAX(ctr)
    FROM (
        (SELECT * FROM counts_6h)
        UNION ALL
        (SELECT * FROM counts_2h)
    ) t
    GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


********************* [2] Logical plan (before optimization) ***********************

    LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
      LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
        LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
          LogicalUnion(all=[true])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1])
                      LogicalTableScan(table=[[my_catalog, my_db, click]])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0])
                      LogicalTableScan(table=[[my_catalog, my_db, display]])


****************** [3] Resulting physical plan (after optimization) ********************

    DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556
      DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555
        DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542

Reply | Threaded
Open this post in threaded view
|

Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

Till Rohrmann
Hi Benoit,

thanks for reporting this issue. Since I'm not too familiar with the SQL component I've pulled in Timo and Jingsong who know much better what could be wrong than I do.

Cheers,
Till

On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <[hidden email]> wrote:
Hello,

We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):
  1. We open 2 event streams ("clicks" and "displays")
  2. We compute the click rate (ctr) over 2 hours and 6 hours sliding windows. 
  3. We then union to output one row per hour with the max value between the values computed over 2 and 6hrs.
You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f, unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause.

Thanks a lot for your help!

Benoit Hanotte

********************************* [1] SQL query *********************************

    WITH displays AS (
        SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display
    ),
   
    clicks AS (
        SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click
    ),
   
    counts_2h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
    ),
   
    counts_6h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
    )
   
    SELECT
        TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
        MAX(ctr)
    FROM (
        (SELECT * FROM counts_6h)
        UNION ALL
        (SELECT * FROM counts_2h)
    ) t
    GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


********************* [2] Logical plan (before optimization) ***********************

    LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
      LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
        LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
          LogicalUnion(all=[true])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1])
                      LogicalTableScan(table=[[my_catalog, my_db, click]])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0])
                      LogicalTableScan(table=[[my_catalog, my_db, display]])


****************** [3] Resulting physical plan (after optimization) ********************

    DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556
      DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555
        DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542

Reply | Threaded
Open this post in threaded view
|

Re: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

Benoit Hanotte
Hello Till,
thanks for your reply! 
I have been able to debug the issue and reported it in https://issues.apache.org/jira/browse/FLINK-15577.
It seems the old planner does not add the window specs to the Logical nodes' digests, leading the HepPlanner to consider the aggregations to be equivalent, when they are not because they use different time windows. I explained the issue more in details in the ticket above, and have submitted a PR earlier today: https://github.com/apache/flink/pull/10854.
Best,
Benoit

From: Till Rohrmann <[hidden email]>
Sent: Tuesday, January 14, 2020 7:13 PM
To: Benoit Hanotte <[hidden email]>
Cc: [hidden email] <[hidden email]>; Jingsong Li <[hidden email]>; [hidden email] <[hidden email]>
Subject: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results
 
Hi Benoit,

thanks for reporting this issue. Since I'm not too familiar with the SQL component I've pulled in Timo and Jingsong who know much better what could be wrong than I do.

Cheers,
Till

On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <[hidden email]> wrote:
Hello,

We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):
  1. We open 2 event streams ("clicks" and "displays")
  2. We compute the click rate (ctr) over 2 hours and 6 hours sliding windows. 
  3. We then union to output one row per hour with the max value between the values computed over 2 and 6hrs.
You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f, unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause.

Thanks a lot for your help!

Benoit Hanotte

********************************* [1] SQL query *********************************

    WITH displays AS (
        SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display
    ),
   
    clicks AS (
        SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click
    ),
   
    counts_2h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
    ),
   
    counts_6h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
    )
   
    SELECT
        TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
        MAX(ctr)
    FROM (
        (SELECT * FROM counts_6h)
        UNION ALL
        (SELECT * FROM counts_2h)
    ) t
    GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


********************* [2] Logical plan (before optimization) ***********************

    LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
      LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
        LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
          LogicalUnion(all=[true])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1])
                      LogicalTableScan(table=[[my_catalog, my_db, click]])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0])
                      LogicalTableScan(table=[[my_catalog, my_db, display]])


****************** [3] Resulting physical plan (after optimization) ********************

    DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556
      DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555
        DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542

Reply | Threaded
Open this post in threaded view
|

Re: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

Till Rohrmann
Great, thanks a lot for looking into the problem and fixing it. I assume that your PR will be merged very soon.

Cheers,
Till

On Tue, Jan 14, 2020 at 7:18 PM Benoit Hanotte <[hidden email]> wrote:
Hello Till,
thanks for your reply! 
I have been able to debug the issue and reported it in https://issues.apache.org/jira/browse/FLINK-15577.
It seems the old planner does not add the window specs to the Logical nodes' digests, leading the HepPlanner to consider the aggregations to be equivalent, when they are not because they use different time windows. I explained the issue more in details in the ticket above, and have submitted a PR earlier today: https://github.com/apache/flink/pull/10854.
What is the purpose of the change The RelNode&#39;s digest is used by the Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an equivalent vertex was already present in the grap...
Best,
Benoit

From: Till Rohrmann <[hidden email]>
Sent: Tuesday, January 14, 2020 7:13 PM
To: Benoit Hanotte <[hidden email]>
Cc: [hidden email] <[hidden email]>; Jingsong Li <[hidden email]>; [hidden email] <[hidden email]>
Subject: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results
 
Hi Benoit,

thanks for reporting this issue. Since I'm not too familiar with the SQL component I've pulled in Timo and Jingsong who know much better what could be wrong than I do.

Cheers,
Till

On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <[hidden email]> wrote:
Hello,

We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):
  1. We open 2 event streams ("clicks" and "displays")
  2. We compute the click rate (ctr) over 2 hours and 6 hours sliding windows. 
  3. We then union to output one row per hour with the max value between the values computed over 2 and 6hrs.
You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f, unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause.

Thanks a lot for your help!

Benoit Hanotte

********************************* [1] SQL query *********************************

    WITH displays AS (
        SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display
    ),
   
    clicks AS (
        SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click
    ),
   
    counts_2h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
    ),
   
    counts_6h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
    )
   
    SELECT
        TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
        MAX(ctr)
    FROM (
        (SELECT * FROM counts_6h)
        UNION ALL
        (SELECT * FROM counts_2h)
    ) t
    GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


********************* [2] Logical plan (before optimization) ***********************

    LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
      LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
        LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
          LogicalUnion(all=[true])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1])
                      LogicalTableScan(table=[[my_catalog, my_db, click]])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0])
                      LogicalTableScan(table=[[my_catalog, my_db, display]])


****************** [3] Resulting physical plan (after optimization) ********************

    DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556
      DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555
        DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542