This post was updated on .
I am trying to understand how to use streaming sql, very similar to the
example from the documentation: count the number of pageclicks in a certain period of time for each user. I'm trying to solve the problem using both the SQL API and the table API My input sample stream looks like this: (timestamp, user, url) I'm using EventTime 1530229582338,John,./reports?id=311 1530229584339,Bob,./cart 1530229587339,Mary,./prod?id=1 1530229592340,Liz,./home 1530229598340,John,./reports?id=312 1530229599341,John,./reports?id=313 1530229600342,Bob,./prod?id=3 1530229601342,Mary,./prod?id=7 1530229604343,Liz,./prod?id=4 My SQL API solution source code is here. Everything is working, but I have a question: It appears that the datatype of the timestamp is java.sql.Timestamp. *Is there a way to retrieve this from the query as a long instead?* When I examine the schema for the table returned by SQL query, I get this: root |-- wend: TimeIndicatorTypeInfo(rowtime) |-- username: String |-- viewcount: Long I would prefer to get the schema back like this instead: root |-- wend: Long |-- username: String |-- viewcount: Long // Get Our Data Stream DataStream<Tuple3<Long,String,String>> eventStream = env .socketTextStream(parms.get("host"), parms.getInt("port")) .map(new TableStreamMapper()) .assignTimestampsAndWatermarks(new MyEventTimestampAssigner()); // Register Table // Dynamic Table From Stream tableEnvironment.registerDataStream("pageViews", eventStream, "pageViewTime.rowtime, username, url"); // Continuous Query String continuousQuery = "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " + "username, COUNT(url) as viewcount FROM pageViews " + "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username"; // Dynamic Table from Continuous Query Table rows = tableEnvironment.sqlQuery(continuousQuery); rows.printSchema(); // Convert Results to DataStream Table resultTable = rows .select("wend, username,viewcount"); TupleTypeInfo<Tuple3<Timestamp,String,Long>> tupleTypeInfo = new TupleTypeInfo<>( Types.SQL_TIMESTAMP, Types.STRING, Types.LONG); DataStream<Tuple3<Timestamp,String,Long>> resultDataStream = tableEnvironment.toAppendStream(resultTable,tupleTypeInfo); resultDataStream.print(); Question # 2 (TableAPI solution) ======== I'm trying to implement the same logic using the Table API. *Question: how do I express that I want to get the count(url) instead of just the url?* I don't know how to do that using the Table API Also, what is the purpose of the "as('clickWindow')" below. How can "clickWindow" be used? What if it's left out of the API calls? Source code is below: // Get Our Data Stream DataStream<Tuple3<Long,String,String>> eventStream = env .socketTextStream(parms.get("host"), parms.getInt("port")) .map(new TableStreamMapper()) .assignTimestampsAndWatermarks(new MyEventTimestampAssigner()); // Dynamic Table From Stream Table dynamicTable = tableEnvironment.fromDataStream(eventStream, "pageViewTime.rowtime, username, url"); // Continuous Query : Table API WindowedTable windowedTable = dynamicTable.window(Tumble.over("1.minutes").on("pageViewTime").as("clickWindow")); windowedTable.table().printSchema(); Table resultTable = windowedTable.table() .select("pageViewTime,username,url"); TupleTypeInfo<Tuple3<Timestamp,String,String>> tupleType = new TupleTypeInfo<>( Types.SQL_TIMESTAMP, Types.STRING, Types.STRING); DataStream<Tuple3<Timestamp,String,String>> resultDataStream = tableEnvironment.toAppendStream(resultTable, tupleType); resultDataStream.print(); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi chrisr, > *Is there a way to retrieve this from the query as a long instead?* You have to convert the timestamp type to long type. It seems there are no internal udf to convert timestamp to unix timestamp, however you can write one and used in your SQL. > *Question: how do I express that I want to get the count(url) instead of just the url?* .groupBy("clickWindow").select("count(url)") can get the count value. There are more examples here[1]. On Fri, Jun 29, 2018 at 8:06 AM, chrisr123 <[hidden email]> wrote: I am trying to understand how to use streaming sql, very similar to the |
Thank you Hequn,
I got it working. Here is the tumbling window query, in both SQL and Table API. I'm getting same results with these: SQL API tableEnvironment.registerDataStream("pageViews", eventStream, "pageViewTime.rowtime, username, url"); String continuousQuery = "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " + "username, COUNT(url) as viewcount FROM pageViews " + "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username"; Table windowedTable = tableEnvironment.sqlQuery(continuousQuery); TABLE API Table dynamicTable = tableEnvironment.fromDataStream(eventStream, "pageViewTime.rowtime, username, url"); Table windowedTable = dynamicTable .window(Tumble.over("1.minutes").on("pageViewTime").as("tumblewindow")) .groupBy("tumblewindow, username") .select("tumblewindow.end as wend,username,url.count as viewcount"); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |