how to use the TUMBLE(time_attr, interval) window function in Flink SQL

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

how to use the TUMBLE(time_attr, interval) window function in Flink SQL

maidangdang44
below is one line of my source, the body containes the user logs:
{
    body: [
        "user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23 00:01:00;user3,url3,2018-10-23 00:02:00"
    ]
}


I user LATERAL TABLE and a User-Defined TableFunction flatmap the source to a new table log, and I want to group by the time and username, here is my code:

public class BodySplitFun extends TableFunction<Tuple3<String, String, Long>> {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        public void eval(Row bodyRow) {
            String body = bodyRow.getField(0).toString();
            String[] lines = body.split(";");
               for (String line : lines) {
        String user = line.split(",")[0];
        String url = line.split(",")[1];
        String sTime = line.split(",")[2];
                        collect(new Tuple3<>(user, url, sdf.parse(sTime).getTime());
                }
            }
       }
}

tblEnv.registerFunction("bodySplit", new BodySplitFun());
tblEnv.sqlUpdate(
           "select  
               count(username)
            from
               (
                  SELECT
                     username,
                     url,
                     sTime 
                  FROM
                     mySource LEFT JOIN LATERAL TABLE(bodySplit(body)) as T(username, url, sTime) ON TRUE
               )
               log
            group by 
               TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username");

when I run my program, I got these error message:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
        ... 49 more

how can I group by the sTime in the table log?





Reply | Threaded
Open this post in threaded view
|

Re: how to use the TUMBLE(time_attr, interval) window function in Flink SQL

Dawid Wysakowicz-2

Hi,

The problem is that sTime is not a Time Attribute[1], which has to be aligned with watermarks mechanism. Right now you cannot create a time attribute from within TableFunction, as far as I know.

What you could do is to do the splitting logic in DataStream API and register a proper table with implemented watermarks in TableEnvironment. Then you can apply the windowing on a table prepared that way.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#time-attributes


On 23/10/2018 06:45, maidangdang44 wrote:
below is one line of my source, the body containes the user logs:
{
    body: [
        "user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23 00:01:00;user3,url3,2018-10-23 00:02:00"
    ]
}


I user LATERAL TABLE and a User-Defined TableFunction flatmap the source to a new table log, and I want to group by the time and username, here is my code:

public class BodySplitFun extends TableFunction<Tuple3<String, String, Long>> {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        public void eval(Row bodyRow) {
            String body = bodyRow.getField(0).toString();
            String[] lines = body.split(";");
               for (String line : lines) {
        String user = line.split(",")[0];
        String url = line.split(",")[1];
        String sTime = line.split(",")[2];
                        collect(new Tuple3<>(user, url, sdf.parse(sTime).getTime());
                }
            }
       }
}

tblEnv.registerFunction("bodySplit", new BodySplitFun());
tblEnv.sqlUpdate(
           "select  
               count(username)
            from
               (
                  SELECT
                     username,
                     url,
                     sTime 
                  FROM
                     mySource LEFT JOIN LATERAL TABLE(bodySplit(body)) as T(username, url, sTime) ON TRUE
               )
               log
            group by 
               TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username");

when I run my program, I got these error message:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
        ... 49 more

how can I group by the sTime in the table log?






signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: how to use the TUMBLE(time_attr, interval) window function in Flink SQL

maidangdang44
This was very helpful. Thank you very much :)

On 10/23/2018 15:20[hidden email] wrote:

Hi,

The problem is that sTime is not a Time Attribute[1], which has to be aligned with watermarks mechanism. Right now you cannot create a time attribute from within TableFunction, as far as I know.

What you could do is to do the splitting logic in DataStream API and register a proper table with implemented watermarks in TableEnvironment. Then you can apply the windowing on a table prepared that way.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#time-attributes


On 23/10/2018 06:45, maidangdang44 wrote:
below is one line of my source, the body containes the user logs:
{
    body: [
        "user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23 00:01:00;user3,url3,2018-10-23 00:02:00"
    ]
}


I user LATERAL TABLE and a User-Defined TableFunction flatmap the source to a new table log, and I want to group by the time and username, here is my code:

public class BodySplitFun extends TableFunction<Tuple3<String, String, Long>> {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        public void eval(Row bodyRow) {
            String body = bodyRow.getField(0).toString();
            String[] lines = body.split(";");
               for (String line : lines) {
        String user = line.split(",")[0];
        String url = line.split(",")[1];
        String sTime = line.split(",")[2];
                        collect(new Tuple3<>(user, url, sdf.parse(sTime).getTime());
                }
            }
       }
}

tblEnv.registerFunction("bodySplit", new BodySplitFun());
tblEnv.sqlUpdate(
           "select  
               count(username)
            from
               (
                  SELECT
                     username,
                     url,
                     sTime 
                  FROM
                     mySource LEFT JOIN LATERAL TABLE(bodySplit(body)) as T(username, url, sTime) ON TRUE
               )
               log
            group by 
               TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username");

when I run my program, I got these error message:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
        ... 49 more

how can I group by the sTime in the table log?