SELECT o.region_code,        concat_ws(          '/',          CAST(SUM(CASE WHEN r.ride_id IS NOT NULL AND c.ride_id IS NULL THEN 1 ELSE 0 END) AS VARCHAR),          CAST(COUNT(1) AS VARCHAR)        ) AS ss_session_conversion_region_5m,        TUMBLE_END(o.rowtime, INTERVAL '5' minute) AS latency_marker   FROM (         SELECT region_code,                offer_id,                rowtime           FROM event_offerings_offer_created          WHERE offer_product_id = 'courier_saver'            AND destination_lat IS NOT NULL            AND destination_lng IS NOT NULL         ) o    LEFT JOIN (         SELECT offer_id,                ride_id,                rowtime           FROM event_ride_requested          WHERE analytical_product_name = 'courier_saver'         ) r      ON o.offer_id = r.offer_id      AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime    LEFT JOIN (         SELECT ride_id,                rowtime           FROM event_ride_canceled          WHERE analytical_product_name = 'courier_saver'            AND NOT after_accepted         ) c      ON r.ride_id = c.ride_id      AND r.rowtime BETWEEN c.rowtime - INTERVAL '1' hour AND c.rowtime  GROUP BY        o.region_code,        TUMBLE(o.rowtime, INTERVAL '5' minute) Explain plan: == Abstract Syntax Tree == LogicalProject(region_code=[$0], ss_session_conversion_region_5m=[CONCAT_WS(_UTF-16LE'/', CAST($2):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, CAST($3):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)], latency_marker=[TUMBLE_END($1)])   LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], agg#1=[COUNT()])     LogicalProject(region_code=[$0], $f1=[TUMBLE($2, 300000)], $f2=[CASE(AND(IS NOT NULL($4), IS NULL($6)), 1, 0)], $f3=[1])       LogicalJoin(condition=[AND(=($4, $6), >=($5, -($7, 3600000)), <=($5, $7))], joinType=[left])         LogicalJoin(condition=[AND(=($1, $3), >=($2, -($5, 3600000)), <=($2, $5))], joinType=[left])           LogicalProject(region_code=[$5], offer_id=[$4], rowtime=[$6])             LogicalFilter(condition=[AND(=($3, _UTF-16LE'courier_saver'), IS NOT NULL($2), IS NOT NULL($1))])               LogicalTableScan(table=[[event_offerings_offer_created]])           LogicalProject(offer_id=[$3], ride_id=[$1], rowtime=[$4])             LogicalFilter(condition=[=($2, _UTF-16LE'courier_saver')])               LogicalTableScan(table=[[event_ride_requested]])         LogicalProject(ride_id=[$1], rowtime=[$4])           LogicalFilter(condition=[AND(=($3, _UTF-16LE'courier_saver'), NOT($2))])             LogicalTableScan(table=[[event_ride_canceled]]) == Optimized Logical Plan == DataStreamCalc(select=[region_code, CONCAT_WS(_UTF-16LE'/', CAST($f1), CAST($f2)) AS ss_session_conversion_region_5m, w$end AS latency_marker])   DataStreamGroupWindowAggregate(groupBy=[region_code], window=[TumblingGroupWindow('w$, 'rowtime, 300000.millis)], select=[region_code, $SUM0($f2) AS $f1, COUNT(*) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])     DataStreamCalc(select=[region_code, rowtime, CASE(AND(IS NOT NULL(ride_id), IS NULL(ride_id0)), 1, 0) AS $f2, 1 AS $f3])       DataStreamWindowJoin(where=[AND(=(ride_id, ride_id0), >=(CAST(rowtime0), -(CAST(rowtime1), 3600000)), <=(CAST(rowtime0), CAST(rowtime1)))], join=[region_code, rowtime, ride_id, rowtime0, ride_id0, rowtime1], joinType=[LeftOuterJoin])         DataStreamCalc(select=[region_code, rowtime, ride_id, rowtime0])           DataStreamWindowJoin(where=[AND(=(offer_id, offer_id0), >=(CAST(rowtime), -(CAST(rowtime0), 3600000)), <=(CAST(rowtime), CAST(rowtime0)))], join=[region_code, offer_id, rowtime, offer_id0, ride_id, rowtime0], joinType=[LeftOuterJoin])             DataStreamCalc(select=[region_code, offer_id, rowtime], where=[AND(=(offer_product_id, _UTF-16LE'courier_saver'), IS NOT NULL(destination_lat), IS NOT NULL(destination_lng))])               DataStreamScan(table=[[event_offerings_offer_created]])             DataStreamCalc(select=[offer_id, ride_id, rowtime], where=[=(analytical_product_name, _UTF-16LE'courier_saver')])               DataStreamScan(table=[[event_ride_requested]])         DataStreamCalc(select=[ride_id, rowtime], where=[AND(=(analytical_product_name, _UTF-16LE'courier_saver'), NOT(after_accepted))])           DataStreamScan(table=[[event_ride_canceled]]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : Timestamps/Watermarks ship_strategy : FORWARD Stage 3 : Operator content : extract_event_name ship_strategy : FORWARD Stage 4 : Operator content : group_counter_ss_session_conversion_region_5m.1.rawKinesisInput ship_strategy : FORWARD Stage 5 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 6 : Operator content : only_offerings_offer_created ship_strategy : FORWARD Stage 7 : Operator content : parse_offerings_offer_created ship_strategy : FORWARD Stage 9 : Operator content : Timestamps/Watermarks ship_strategy : SHUFFLE Stage 10 : Operator content : group_counter_event_offerings_offer_created.kinesis_records ship_strategy : FORWARD Stage 11 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 12 : Operator content : Process ship_strategy : FORWARD Stage 13 : Operator content : only_ride_requested ship_strategy : FORWARD Stage 14 : Operator content : parse_ride_requested ship_strategy : FORWARD Stage 16 : Operator content : Timestamps/Watermarks ship_strategy : SHUFFLE Stage 17 : Operator content : group_counter_event_ride_requested.kinesis_records ship_strategy : FORWARD Stage 18 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 19 : Operator content : Process ship_strategy : FORWARD Stage 20 : Operator content : only_ride_canceled ship_strategy : FORWARD Stage 21 : Operator content : parse_ride_canceled ship_strategy : FORWARD Stage 23 : Operator content : Timestamps/Watermarks ship_strategy : SHUFFLE Stage 24 : Operator content : group_counter_event_ride_canceled.kinesis_records ship_strategy : FORWARD Stage 25 : Operator content : sample_with_formatter ship_strategy : FORWARD Stage 26 : Operator content : Process ship_strategy : FORWARD Stage 27 : Operator content : from: (occurred_at, destination_lng, destination_lat, offer_product_id, offer_id, region_code, rowtime) ship_strategy : FORWARD Stage 28 : Operator content : where: (AND(=(offer_product_id, _UTF-16LE'courier_saver'), IS NOT NULL(destination_lat), IS NOT NULL(destination_lng))), select: (region_code, offer_id, rowtime) ship_strategy : FORWARD Stage 29 : Operator content : from: (occurred_at, ride_id, analytical_product_name, offer_id, rowtime) ship_strategy : FORWARD Stage 30 : Operator content : where: (=(analytical_product_name, _UTF-16LE'courier_saver')), select: (offer_id, ride_id, rowtime) ship_strategy : FORWARD Stage 33 : Operator content : where: (AND(=(offer_id, offer_id0), >=(CAST(rowtime), -(CAST(rowtime0), 3600000)), <=(CAST(rowtime), CAST(rowtime0)))), join: (region_code, offer_id, rowtime, offer_id0, ride_id, rowtime0) ship_strategy : HASH Stage 34 : Operator content : select: (region_code, rowtime, ride_id, rowtime0) ship_strategy : FORWARD Stage 35 : Operator content : from: (occurred_at, ride_id, after_accepted, analytical_product_name, rowtime) ship_strategy : FORWARD Stage 36 : Operator content : where: (AND(=(analytical_product_name, _UTF-16LE'courier_saver'), NOT(after_accepted))), select: (ride_id, rowtime) ship_strategy : FORWARD Stage 39 : Operator content : where: (AND(=(ride_id, ride_id0), >=(CAST(rowtime0), -(CAST(rowtime1), 3600000)), <=(CAST(rowtime0), CAST(rowtime1)))), join: (region_code, rowtime, ride_id, rowtime0, ride_id0, rowtime1) ship_strategy : HASH Stage 40 : Operator content : select: (region_code, rowtime, CASE(AND(IS NOT NULL(ride_id), IS NULL(ride_id0)), 1, 0) AS $f2, 1 AS $f3) ship_strategy : FORWARD Stage 41 : Operator content : time attribute: (rowtime) ship_strategy : FORWARD Stage 43 : Operator content : groupBy: (region_code), window: (TumblingGroupWindow('w$, 'rowtime, 300000.millis)), select: (region_code, $SUM0($f2) AS $f1, COUNT(*) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) ship_strategy : HASH Stage 44 : Operator content : select: (region_code, CONCAT_WS(_UTF-16LE'/', CAST($f1), CAST($f2)) AS ss_session_conversion_region_5m, w$end AS latency_marker) ship_strategy : FORWARD Stage 53 : Operator content : from: (occurred_at, destination_lng, destination_lat, offer_product_id, offer_id, region_code, rowtime) ship_strategy : FORWARD Stage 54 : Operator content : where: (AND(=(offer_product_id, _UTF-16LE'courier_saver'), IS NOT NULL(destination_lat), IS NOT NULL(destination_lng))), select: (region_code, offer_id, rowtime) ship_strategy : FORWARD Stage 55 : Operator content : from: (occurred_at, ride_id, analytical_product_name, offer_id, rowtime) ship_strategy : FORWARD Stage 56 : Operator content : where: (=(analytical_product_name, _UTF-16LE'courier_saver')), select: (offer_id, ride_id, rowtime) ship_strategy : FORWARD Stage 59 : Operator content : where: (AND(=(offer_id, offer_id0), >=(CAST(rowtime), -(CAST(rowtime0), 3600000)), <=(CAST(rowtime), CAST(rowtime0)))), join: (region_code, offer_id, rowtime, offer_id0, ride_id, rowtime0) ship_strategy : HASH Stage 60 : Operator content : select: (region_code, rowtime, ride_id, rowtime0) ship_strategy : FORWARD Stage 61 : Operator content : from: (occurred_at, ride_id, after_accepted, analytical_product_name, rowtime) ship_strategy : FORWARD Stage 62 : Operator content : where: (AND(=(analytical_product_name, _UTF-16LE'courier_saver'), NOT(after_accepted))), select: (ride_id, rowtime) ship_strategy : FORWARD Stage 65 : Operator content : where: (AND(=(ride_id, ride_id0), >=(CAST(rowtime0), -(CAST(rowtime1), 3600000)), <=(CAST(rowtime0), CAST(rowtime1)))), join: (region_code, rowtime, ride_id, rowtime0, ride_id0, rowtime1) ship_strategy : HASH Stage 66 : Operator content : select: (region_code, rowtime, CASE(AND(IS NOT NULL(ride_id), IS NULL(ride_id0)), 1, 0) AS $f2, 1 AS $f3) ship_strategy : FORWARD Stage 67 : Operator content : time attribute: (rowtime) ship_strategy : FORWARD Stage 69 : Operator content : groupBy: (region_code), window: (TumblingGroupWindow('w$, 'rowtime, 300000.millis)), select: (region_code, $SUM0($f2) AS $f1, COUNT(*) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) ship_strategy : HASH Stage 70 : Operator content : select: (region_code, CONCAT_WS(_UTF-16LE'/', CAST($f1), CAST($f2)) AS ss_session_conversion_region_5m, w$end AS latency_marker) ship_strategy : FORWARD