Re: Data Type of timestamp in Streaming SQL Result? Long instead of timestamp?

Posted by Hequn Cheng on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Data-Type-of-timestamp-in-Streaming-SQL-Result-Long-instead-of-timestamp-tp21065p21069.html

Hi chrisr,

> *Is there a way to retrieve this from the query as a long instead?*
You have to convert the timestamp type to long type. It seems there are no internal udf to convert timestamp to unix timestamp, however you can write one and used in your SQL.

*Question: how do I express that I want to get the count(url) instead of just the url?*
.groupBy("clickWindow").select("count(url)") can get the count value. There are more examples here[1].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#group-windows


On Fri, Jun 29, 2018 at 8:06 AM, chrisr123 <[hidden email]> wrote:
I am trying to understand how to use streaming sql, very similar to the
example
from the documentation: count the number of pageclicks in a certain period
of time for each user.
I'm trying to solve the problem using both the SQL API and the table API

My input sample stream looks like this: (timestamp, user, url)
I'm using EventTime
1530229582338,John,./reports?id=311
1530229584339,Bob,./cart
1530229587339,Mary,./prod?id=1
1530229592340,Liz,./home
1530229598340,John,./reports?id=312
1530229599341,John,./reports?id=313
1530229600342,Bob,./prod?id=3
1530229601342,Mary,./prod?id=7
1530229604343,Liz,./prod?id=4

My SQL API solution source code is here.
Everything is working, but I have a question:
It appears that the datatype of the timestamp is java.sql.Timestamp.
*Is there a way to retrieve this from the query as a long instead?*

When I examine the schema for the table returned by SQL query, I get this:
root
 |-- wend: TimeIndicatorTypeInfo(rowtime)
 |-- username: String
 |-- viewcount: Long

I would prefer to get the schema back like this instead:
root
 |-- wend: Long
 |-- username: String
 |-- viewcount: Long


// Get Our Data Stream
DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                .socketTextStream(parms.get("host"), parms.getInt("port"))
                .map(new TableStreamMapper())
                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


// Register Table
// Dynamic Table From Stream
tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

// Continuous Query
String continuousQuery =
        "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
        "username, COUNT(url) as viewcount FROM pageViews " +
        "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

// Dynamic Table from Continuous Query
Table rows = tableEnvironment.sqlQuery(continuousQuery);
rows.printSchema();     

// Convert Results to DataStream
Table resultTable = rows
        .select("wend, username,viewcount");


TupleTypeInfo<Tuple3&lt;Timestamp,String,Long>> tupleTypeInfo = new
TupleTypeInfo<>(
                Types.SQL_TIMESTAMP,
                Types.STRING,
                Types.LONG);
DataStream<Tuple3&lt;Timestamp,String,Long>> resultDataStream =
tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
resultDataStream.print();



Question # 2  (TableAPI solution)
========
I'm trying to implement the same logic using the Table API.
*Question: how do I express that I want to get the count(url) instead of
just the url?*
I don't know how to do that using the Table API
Source code is below:

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Dynamic Table From Stream
                Table dynamicTable = tableEnvironment.fromDataStream(eventStream,
"pageViewTime.rowtime, username, url");


                // Continuous Query : Table API
                WindowedTable windowedTable =
dynamicTable.window(Tumble.over("1.minutes").on("pageViewTime").as("clickWindow"));
                windowedTable.table().printSchema();


                Table resultTable = windowedTable.table()
                        .select("pageViewTime,username,url");           
                TupleTypeInfo<Tuple3&lt;Timestamp,String,String>> tupleType = new
TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.STRING);
                DataStream<Tuple3&lt;Timestamp,String,String>> resultDataStream =
                                tableEnvironment.toAppendStream(resultTable, tupleType);
                resultDataStream.print();                               









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/