Flink SQL OVER window

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL OVER window

Patrick Angeles
Fairly new to Flink here so this might be a newbie mistake, but here's the problem. I created the following table and view:
        

CREATE TABLE test (

    event_time     TIMESTAMP(3),

    symbol         STRING,

    price          DOUBLE,

    WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE

) WITH (

    ...

) ;

    

CREATE VIEW test_view AS (

    SELECT

        symbol,    

        TUMBLE_START (event_time, INTERVAL '1' MINUTE) AS t_start,

        AVG (price) AS avg_price,

        MIN (price) AS min_price,

        MAX (price) AS max_price

    FROM

        test 

    WHERE event_time IS NOT NULL

    GROUP BY

        TUMBLE(event_time, INTERVAL '1' MINUTE), symbol

) ;


Describe shows:

Flink SQL> describe test ;

+------------+------------------------+------+-----+--------+------------------------------------+

|       name |                   type | null | key | extras |                          watermark |

+------------+------------------------+------+-----+--------+------------------------------------+

| event_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `event_time` - INTERVAL '1' MINUTE |

|     symbol |                 STRING | true |     |        |                                    |

|      price |                 DOUBLE | true |     |        |                                    |

+------------+------------------------+------+-----+--------+------------------------------------+

3 rows in set


Flink SQL> describe test_view ;

+-----------+------------------------+------+-----+--------+-----------+

|      name |                   type | null | key | extras | watermark |

+-----------+------------------------+------+-----+--------+-----------+

|    symbol |                 STRING | true |     |        |           |

|   t_start | TIMESTAMP(3) *ROWTIME* | true |     |        |           |

| avg_price |                 DOUBLE | true |     |        |           |

| min_price |                 DOUBLE | true |     |        |           |

| max_price |                 DOUBLE | true |     |        |           |

+-----------+------------------------+------+-----+--------+-----------+

5 rows in set


When I run a query over the view, I get the following error:

Flink SQL> SELECT

>     symbol,

>     t_start,

>     avg_price,

>     min_price,

>     max_price,

>     FIRST_VALUE (avg_price) OVER x AS prev_avg_price

> FROM test_view

> WINDOW x AS (

>     PARTITION BY symbol

>     ORDER BY t_start

>     ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

> ) ;         

> 

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.


Right now, to get around this, I need to materialize the results off test table into a new table that matches the view. Seems that this ought to be doable doing everything in one job instead of the intermediate materialization step. Am I missing something? 

Thanks in advance. 

Patrick


 
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL OVER window

Patrick Angeles
Forgot to mention, I am using Flink 1.12.

On Fri, Jan 29, 2021 at 10:11 AM Patrick Angeles <[hidden email]> wrote:
Fairly new to Flink here so this might be a newbie mistake, but here's the problem. I created the following table and view:
        

CREATE TABLE test (

    event_time     TIMESTAMP(3),

    symbol         STRING,

    price          DOUBLE,

    WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE

) WITH (

    ...

) ;

    

CREATE VIEW test_view AS (

    SELECT

        symbol,    

        TUMBLE_START (event_time, INTERVAL '1' MINUTE) AS t_start,

        AVG (price) AS avg_price,

        MIN (price) AS min_price,

        MAX (price) AS max_price

    FROM

        test 

    WHERE event_time IS NOT NULL

    GROUP BY

        TUMBLE(event_time, INTERVAL '1' MINUTE), symbol

) ;


Describe shows:

Flink SQL> describe test ;

+------------+------------------------+------+-----+--------+------------------------------------+

|       name |                   type | null | key | extras |                          watermark |

+------------+------------------------+------+-----+--------+------------------------------------+

| event_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `event_time` - INTERVAL '1' MINUTE |

|     symbol |                 STRING | true |     |        |                                    |

|      price |                 DOUBLE | true |     |        |                                    |

+------------+------------------------+------+-----+--------+------------------------------------+

3 rows in set


Flink SQL> describe test_view ;

+-----------+------------------------+------+-----+--------+-----------+

|      name |                   type | null | key | extras | watermark |

+-----------+------------------------+------+-----+--------+-----------+

|    symbol |                 STRING | true |     |        |           |

|   t_start | TIMESTAMP(3) *ROWTIME* | true |     |        |           |

| avg_price |                 DOUBLE | true |     |        |           |

| min_price |                 DOUBLE | true |     |        |           |

| max_price |                 DOUBLE | true |     |        |           |

+-----------+------------------------+------+-----+--------+-----------+

5 rows in set


When I run a query over the view, I get the following error:

Flink SQL> SELECT

>     symbol,

>     t_start,

>     avg_price,

>     min_price,

>     max_price,

>     FIRST_VALUE (avg_price) OVER x AS prev_avg_price

> FROM test_view

> WINDOW x AS (

>     PARTITION BY symbol

>     ORDER BY t_start

>     ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

> ) ;         

> 

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.


Right now, to get around this, I need to materialize the results off test table into a new table that matches the view. Seems that this ought to be doable doing everything in one job instead of the intermediate materialization step. Am I missing something? 

Thanks in advance. 

Patrick


 
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL OVER window

Seth Wiesman
You need to use TUMBLE_ROWTIME to extract a time attribute from a window, TUMBLE_END is just a timestamp.


Seth

On Fri, Jan 29, 2021 at 9:14 AM Patrick Angeles <[hidden email]> wrote:
Forgot to mention, I am using Flink 1.12.

On Fri, Jan 29, 2021 at 10:11 AM Patrick Angeles <[hidden email]> wrote:
Fairly new to Flink here so this might be a newbie mistake, but here's the problem. I created the following table and view:
        

CREATE TABLE test (

    event_time     TIMESTAMP(3),

    symbol         STRING,

    price          DOUBLE,

    WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE

) WITH (

    ...

) ;

    

CREATE VIEW test_view AS (

    SELECT

        symbol,    

        TUMBLE_START (event_time, INTERVAL '1' MINUTE) AS t_start,

        AVG (price) AS avg_price,

        MIN (price) AS min_price,

        MAX (price) AS max_price

    FROM

        test 

    WHERE event_time IS NOT NULL

    GROUP BY

        TUMBLE(event_time, INTERVAL '1' MINUTE), symbol

) ;


Describe shows:

Flink SQL> describe test ;

+------------+------------------------+------+-----+--------+------------------------------------+

|       name |                   type | null | key | extras |                          watermark |

+------------+------------------------+------+-----+--------+------------------------------------+

| event_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `event_time` - INTERVAL '1' MINUTE |

|     symbol |                 STRING | true |     |        |                                    |

|      price |                 DOUBLE | true |     |        |                                    |

+------------+------------------------+------+-----+--------+------------------------------------+

3 rows in set


Flink SQL> describe test_view ;

+-----------+------------------------+------+-----+--------+-----------+

|      name |                   type | null | key | extras | watermark |

+-----------+------------------------+------+-----+--------+-----------+

|    symbol |                 STRING | true |     |        |           |

|   t_start | TIMESTAMP(3) *ROWTIME* | true |     |        |           |

| avg_price |                 DOUBLE | true |     |        |           |

| min_price |                 DOUBLE | true |     |        |           |

| max_price |                 DOUBLE | true |     |        |           |

+-----------+------------------------+------+-----+--------+-----------+

5 rows in set


When I run a query over the view, I get the following error:

Flink SQL> SELECT

>     symbol,

>     t_start,

>     avg_price,

>     min_price,

>     max_price,

>     FIRST_VALUE (avg_price) OVER x AS prev_avg_price

> FROM test_view

> WINDOW x AS (

>     PARTITION BY symbol

>     ORDER BY t_start

>     ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

> ) ;         

> 

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.


Right now, to get around this, I need to materialize the results off test table into a new table that matches the view. Seems that this ought to be doable doing everything in one job instead of the intermediate materialization step. Am I missing something? 

Thanks in advance. 

Patrick


 
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL OVER window

Patrick Angeles
Thanks, Seth. That did the trick!

On Fri, Jan 29, 2021 at 10:51 AM Seth Wiesman <[hidden email]> wrote:
You need to use TUMBLE_ROWTIME to extract a time attribute from a window, TUMBLE_END is just a timestamp.


Seth

On Fri, Jan 29, 2021 at 9:14 AM Patrick Angeles <[hidden email]> wrote:
Forgot to mention, I am using Flink 1.12.

On Fri, Jan 29, 2021 at 10:11 AM Patrick Angeles <[hidden email]> wrote:
Fairly new to Flink here so this might be a newbie mistake, but here's the problem. I created the following table and view:
        

CREATE TABLE test (

    event_time     TIMESTAMP(3),

    symbol         STRING,

    price          DOUBLE,

    WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE

) WITH (

    ...

) ;

    

CREATE VIEW test_view AS (

    SELECT

        symbol,    

        TUMBLE_START (event_time, INTERVAL '1' MINUTE) AS t_start,

        AVG (price) AS avg_price,

        MIN (price) AS min_price,

        MAX (price) AS max_price

    FROM

        test 

    WHERE event_time IS NOT NULL

    GROUP BY

        TUMBLE(event_time, INTERVAL '1' MINUTE), symbol

) ;


Describe shows:

Flink SQL> describe test ;

+------------+------------------------+------+-----+--------+------------------------------------+

|       name |                   type | null | key | extras |                          watermark |

+------------+------------------------+------+-----+--------+------------------------------------+

| event_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `event_time` - INTERVAL '1' MINUTE |

|     symbol |                 STRING | true |     |        |                                    |

|      price |                 DOUBLE | true |     |        |                                    |

+------------+------------------------+------+-----+--------+------------------------------------+

3 rows in set


Flink SQL> describe test_view ;

+-----------+------------------------+------+-----+--------+-----------+

|      name |                   type | null | key | extras | watermark |

+-----------+------------------------+------+-----+--------+-----------+

|    symbol |                 STRING | true |     |        |           |

|   t_start | TIMESTAMP(3) *ROWTIME* | true |     |        |           |

| avg_price |                 DOUBLE | true |     |        |           |

| min_price |                 DOUBLE | true |     |        |           |

| max_price |                 DOUBLE | true |     |        |           |

+-----------+------------------------+------+-----+--------+-----------+

5 rows in set


When I run a query over the view, I get the following error:

Flink SQL> SELECT

>     symbol,

>     t_start,

>     avg_price,

>     min_price,

>     max_price,

>     FIRST_VALUE (avg_price) OVER x AS prev_avg_price

> FROM test_view

> WINDOW x AS (

>     PARTITION BY symbol

>     ORDER BY t_start

>     ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

> ) ;         

> 

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.


Right now, to get around this, I need to materialize the results off test table into a new table that matches the view. Seems that this ought to be doable doing everything in one job instead of the intermediate materialization step. Am I missing something? 

Thanks in advance. 

Patrick