I am trying to understand how to use streaming sql, very similar to the
example
from the documentation: count the number of pageclicks in a certain period
of time for each user.
I'm trying to solve the problem using both the SQL API and the table API
My input sample stream looks like this: (timestamp, user, url)
I'm using EventTime
1530229582338,John,./reports?id=311
1530229584339,Bob,./cart
1530229587339,Mary,./prod?id=1
1530229592340,Liz,./home
1530229598340,John,./reports?id=312
1530229599341,John,./reports?id=313
1530229600342,Bob,./prod?id=3
1530229601342,Mary,./prod?id=7
1530229604343,Liz,./prod?id=4
My SQL API solution source code is here.
Everything is working, but I have a question:
It appears that the datatype of the timestamp is java.sql.Timestamp.
*Is there a way to retrieve this from the query as a long instead?*
When I examine the schema for the table returned by SQL query, I get this:
root
|-- wend: TimeIndicatorTypeInfo(rowtime)
|-- username: String
|-- viewcount: Long
I would prefer to get the schema back like this instead:
root
|-- wend: Long
|-- username: String
|-- viewcount: Long
// Get Our Data Stream
DataStream<Tuple3<Long,String,String>> eventStream = env
.socketTextStream(parms.get("host"), parms.getInt("port"))
.map(new TableStreamMapper())
.assignTimestampsAndWatermarks( new MyEventTimestampAssigner());
// Register Table
// Dynamic Table From Stream
tableEnvironment.registerDataStream("pageViews" , eventStream,
"pageViewTime.rowtime, username, url");
// Continuous Query
String continuousQuery =
"SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
"username, COUNT(url) as viewcount FROM pageViews " +
"GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";
// Dynamic Table from Continuous Query
Table rows = tableEnvironment.sqlQuery(continuousQuery);
rows.printSchema();
// Convert Results to DataStream
Table resultTable = rows
.select("wend, username,viewcount");
TupleTypeInfo<Tuple3<Timestamp,String,Long>> tupleTypeInfo = new
TupleTypeInfo<>(
Types.SQL_TIMESTAMP,
Types.STRING,
Types.LONG);
DataStream<Tuple3<Timestamp,String,Long>> resultDataStream =
tableEnvironment.toAppendStream(resultTable, tupleTypeInfo);
resultDataStream.print();
Question # 2 (TableAPI solution)
========
I'm trying to implement the same logic using the Table API.
*Question: how do I express that I want to get the count(url) instead of
just the url?*
I don't know how to do that using the Table API
Source code is below:
// Get Our Data Stream
DataStream<Tuple3<Long,String,String>> eventStream = env
.socketTextStream(parms.get("host"), parms.getInt("port"))
.map(new TableStreamMapper())
.assignTimestampsAndWatermarks( new MyEventTimestampAssigner());
// Dynamic Table From Stream
Table dynamicTable = tableEnvironment.fromDataStream(eventStream,
"pageViewTime.rowtime, username, url");
// Continuous Query : Table API
WindowedTable windowedTable =
dynamicTable.window(Tumble.over("1.minutes").on(" pageViewTime").as(" clickWindow"));
windowedTable.table().printSchema();
Table resultTable = windowedTable.table()
.select("pageViewTime,username,url");
TupleTypeInfo<Tuple3<Timestamp,String,String>> tupleType = new
TupleTypeInfo<>(
Types.SQL_TIMESTAMP,
Types.STRING,
Types.STRING);
DataStream<Tuple3<Timestamp,String,String>> resultDataStream =
tableEnvironment.toAppendStream(resultTable, tupleType);
resultDataStream.print();
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050. n4.nabble.com/
Free forum by Nabble | Edit this page |