Data Type of timestamp in Streaming SQL Result? Long instead of timestamp?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Data Type of timestamp in Streaming SQL Result? Long instead of timestamp?

chrisr123
This post was updated on .
I am trying to understand how to use streaming sql, very similar to the
example
from the documentation: count the number of pageclicks in a certain period
of time for each user.
I'm trying to solve the problem using both the SQL API and the table API

My input sample stream looks like this: (timestamp, user, url)
I'm using EventTime
1530229582338,John,./reports?id=311
1530229584339,Bob,./cart
1530229587339,Mary,./prod?id=1
1530229592340,Liz,./home
1530229598340,John,./reports?id=312
1530229599341,John,./reports?id=313
1530229600342,Bob,./prod?id=3
1530229601342,Mary,./prod?id=7
1530229604343,Liz,./prod?id=4

My SQL API solution source code is here.
Everything is working, but I have a question:
It appears that the datatype of the timestamp is java.sql.Timestamp.
*Is there a way to retrieve this from the query as a long instead?*

When I examine the schema for the table returned by SQL query, I get this:
root
 |-- wend: TimeIndicatorTypeInfo(rowtime)
 |-- username: String
 |-- viewcount: Long

I would prefer to get the schema back like this instead:
root
 |-- wend: Long
 |-- username: String
 |-- viewcount: Long


// Get Our Data Stream
DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                .socketTextStream(parms.get("host"), parms.getInt("port"))
                .map(new TableStreamMapper())
                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


// Register Table
// Dynamic Table From Stream
tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

// Continuous Query
String continuousQuery =
        "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
        "username, COUNT(url) as viewcount FROM pageViews " +
        "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

// Dynamic Table from Continuous Query
Table rows = tableEnvironment.sqlQuery(continuousQuery);
rows.printSchema();

// Convert Results to DataStream
Table resultTable = rows
        .select("wend, username,viewcount");


TupleTypeInfo<Tuple3&lt;Timestamp,String,Long>> tupleTypeInfo = new
TupleTypeInfo<>(
                Types.SQL_TIMESTAMP,
                Types.STRING,
                Types.LONG);
DataStream<Tuple3&lt;Timestamp,String,Long>> resultDataStream =
tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
resultDataStream.print();



Question # 2  (TableAPI solution)
========
I'm trying to implement the same logic using the Table API.
*Question: how do I express that I want to get the count(url) instead of
just the url?*
I don't know how to do that using the Table API

Also, what is the purpose of the "as('clickWindow')" below.
How can "clickWindow" be used? What if it's left out of the API calls?



Source code is below:

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());
               

                // Dynamic Table From Stream
                Table dynamicTable = tableEnvironment.fromDataStream(eventStream,
"pageViewTime.rowtime, username, url");
               
       
                // Continuous Query : Table API
                WindowedTable windowedTable =
dynamicTable.window(Tumble.over("1.minutes").on("pageViewTime").as("clickWindow"));
                windowedTable.table().printSchema();
               
               
                Table resultTable = windowedTable.table()
                        .select("pageViewTime,username,url");
                TupleTypeInfo<Tuple3&lt;Timestamp,String,String>> tupleType = new
TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.STRING);
                DataStream<Tuple3&lt;Timestamp,String,String>> resultDataStream =
                                tableEnvironment.toAppendStream(resultTable, tupleType);
                resultDataStream.print();









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Data Type of timestamp in Streaming SQL Result? Long instead of timestamp?

Hequn Cheng
Hi chrisr,

> *Is there a way to retrieve this from the query as a long instead?*
You have to convert the timestamp type to long type. It seems there are no internal udf to convert timestamp to unix timestamp, however you can write one and used in your SQL.

*Question: how do I express that I want to get the count(url) instead of just the url?*
.groupBy("clickWindow").select("count(url)") can get the count value. There are more examples here[1].



On Fri, Jun 29, 2018 at 8:06 AM, chrisr123 <[hidden email]> wrote:
I am trying to understand how to use streaming sql, very similar to the
example
from the documentation: count the number of pageclicks in a certain period
of time for each user.
I'm trying to solve the problem using both the SQL API and the table API

My input sample stream looks like this: (timestamp, user, url)
I'm using EventTime
1530229582338,John,./reports?id=311
1530229584339,Bob,./cart
1530229587339,Mary,./prod?id=1
1530229592340,Liz,./home
1530229598340,John,./reports?id=312
1530229599341,John,./reports?id=313
1530229600342,Bob,./prod?id=3
1530229601342,Mary,./prod?id=7
1530229604343,Liz,./prod?id=4

My SQL API solution source code is here.
Everything is working, but I have a question:
It appears that the datatype of the timestamp is java.sql.Timestamp.
*Is there a way to retrieve this from the query as a long instead?*

When I examine the schema for the table returned by SQL query, I get this:
root
 |-- wend: TimeIndicatorTypeInfo(rowtime)
 |-- username: String
 |-- viewcount: Long

I would prefer to get the schema back like this instead:
root
 |-- wend: Long
 |-- username: String
 |-- viewcount: Long


// Get Our Data Stream
DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                .socketTextStream(parms.get("host"), parms.getInt("port"))
                .map(new TableStreamMapper())
                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


// Register Table
// Dynamic Table From Stream
tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

// Continuous Query
String continuousQuery =
        "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
        "username, COUNT(url) as viewcount FROM pageViews " +
        "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

// Dynamic Table from Continuous Query
Table rows = tableEnvironment.sqlQuery(continuousQuery);
rows.printSchema();     

// Convert Results to DataStream
Table resultTable = rows
        .select("wend, username,viewcount");


TupleTypeInfo<Tuple3&lt;Timestamp,String,Long>> tupleTypeInfo = new
TupleTypeInfo<>(
                Types.SQL_TIMESTAMP,
                Types.STRING,
                Types.LONG);
DataStream<Tuple3&lt;Timestamp,String,Long>> resultDataStream =
tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
resultDataStream.print();



Question # 2  (TableAPI solution)
========
I'm trying to implement the same logic using the Table API.
*Question: how do I express that I want to get the count(url) instead of
just the url?*
I don't know how to do that using the Table API
Source code is below:

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Dynamic Table From Stream
                Table dynamicTable = tableEnvironment.fromDataStream(eventStream,
"pageViewTime.rowtime, username, url");


                // Continuous Query : Table API
                WindowedTable windowedTable =
dynamicTable.window(Tumble.over("1.minutes").on("pageViewTime").as("clickWindow"));
                windowedTable.table().printSchema();


                Table resultTable = windowedTable.table()
                        .select("pageViewTime,username,url");           
                TupleTypeInfo<Tuple3&lt;Timestamp,String,String>> tupleType = new
TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.STRING);
                DataStream<Tuple3&lt;Timestamp,String,String>> resultDataStream =
                                tableEnvironment.toAppendStream(resultTable, tupleType);
                resultDataStream.print();                               









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Data Type of timestamp in Streaming SQL Result? Long instead of timestamp?

chrisr123
Thank you Hequn,
I got it working. Here is the tumbling window query, in both SQL and Table
API.
I'm getting same results with these:



SQL API
tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");
String continuousQuery =
    "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
    "username, COUNT(url) as viewcount FROM pageViews " +
    "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";
Table  windowedTable = tableEnvironment.sqlQuery(continuousQuery);



TABLE API
Table dynamicTable = tableEnvironment.fromDataStream(eventStream,
"pageViewTime.rowtime, username, url");
Table windowedTable = dynamicTable
    .window(Tumble.over("1.minutes").on("pageViewTime").as("tumblewindow"))
    .groupBy("tumblewindow, username")
    .select("tumblewindow.end as wend,username,url.count as viewcount");




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/