Passing type information to JDBCAppendTableSink

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Passing type information to JDBCAppendTableSink

chrisr123
This post was updated on .
I'm trying to determine if I'm specifying type information properly when
doing an INSERT using
the JDBCAppendTableSink API.   Specifically, how do I specify timestamp and
date types? It looks like
I need to use Type.SQL_TIMESTAMP for a timestamp but BasicTypeInfo for types
like varchar, etc?

I am having trouble finding complete examples. I got this to work below but
I wanted to confirm I'm
doing things the correct way?

This is for an append-only into a Derby Database table.

My DDL
# simple table with a timestamp, varchar, bigint
create table mydb.pageview_counts
(window_end timestamp not null,
 username varchar(40) not null,
 viewcount bigint not null);

My Insert Statement
// Write Result Table to Sink
// Configure Sink
JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
        .setDrivername("org.apache.derby.jdbc.ClientDriver")
        .setDBUrl("jdbc:derby://myhost:1527/mydb")
        .setUsername("foo")
        .setPassword("bar")
        .setBatchSize(1)
        .setQuery("INSERT INTO mydb.pageview_counts (window_end,username,viewcount)
VALUES (?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
        .build();





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

miki haiat
can you share the full code .

On Sun, Jul 1, 2018 at 12:49 PM chrisr123 <[hidden email]> wrote:

I'm trying to determine if I'm specifying type information properly when
doing an INSERT using
the JDBCAppendTableSink API.   Specifically, how do I specify timestamp and
date types? It looks like
I need to use Type.SQL_TIMESTAMP for a timestamp but BasicTypeInfo for types
like varchar, etc?

I am having trouble finding complete examples. I got this to work below but
I wanted to confirm I'm
doing things the correct way?

This is for an append-only into a Derby Database table.

My DDL
# simple table with a timestamp, varchar, bigint
create table mydb.pageview_counts
(window_end     timestamp not null,
 username       varchar(40) not null,
 viewcount      bigint not null);

My Insert Statement
// Write Result Table to Sink
// Configure Sink
JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
        .setDrivername("org.apache.derby.jdbc.ClientDriver")
        .setDBUrl("jdbc:myhost://captain:1527/mydb")
        .setUsername("foo")
        .setPassword("bar")
        .setBatchSize(1)
        .setQuery("INSERT INTO mydb.pageview_counts (window_end,username,viewcount)
VALUES (?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
        .build();





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

chrisr123

Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);
               
                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);
               
                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);
               
                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());
               

                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");
               
            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
                                "username, COUNT(url) as viewcount FROM pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";
               
                // Dynamic Table from Continuous Query
                Table windowedTable = tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();
               
                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");
               
       
                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();
               
               
                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
               
.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();
               
               
                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");
               

                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

Rong Rong
Hi Chris,

Looking at the code, seems like JDBCTypeUtil [1] is used for converting Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping. However the JDBC types are different. 

Regarding the question whether your insert is correctly configured. It directly relates to how your DB executes the JDBC insert command. 
1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like you can even execute your command without type array or type mapping cannot be found, in this case the PrepareStatement will be written with plain Object type. I tired it on MySQL and it actually works pretty well. 
2. Another question is whether your underlying DB can handle "implicit type cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, so it might be a good idea to include some sanity check beforehand. 

Thanks,
Rong


On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <[hidden email]> wrote:

Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);

                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);

                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
                                "username, COUNT(url) as viewcount FROM pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

                // Dynamic Table from Continuous Query
                Table windowedTable = tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();   

                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");


                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();                       


                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();


                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");           


                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

Fabian Hueske-2
Hi,

In addition to what Rong said:

- The types look OK.
- You can also use Types.STRING, and Types.LONG instead of BasicTypeInfo.xxx
- Beware that in the failure case, you might have multiple entries in the database table. Some databases support an upsert syntax which (together with key or uniqueness constraints) can ensure that each result is added just once, even if the query recovers from a failure.

Best, Fabian

2018-07-01 17:25 GMT+02:00 Rong Rong <[hidden email]>:
Hi Chris,

Looking at the code, seems like JDBCTypeUtil [1] is used for converting Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping. However the JDBC types are different. 

Regarding the question whether your insert is correctly configured. It directly relates to how your DB executes the JDBC insert command. 
1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like you can even execute your command without type array or type mapping cannot be found, in this case the PrepareStatement will be written with plain Object type. I tired it on MySQL and it actually works pretty well. 
2. Another question is whether your underlying DB can handle "implicit type cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, so it might be a good idea to include some sanity check beforehand. 

Thanks,
Rong


On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <[hidden email]> wrote:

Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);

                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);

                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
                                "username, COUNT(url) as viewcount FROM pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

                // Dynamic Table from Continuous Query
                Table windowedTable = tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();   

                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");


                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();                       


                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();


                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");           


                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

chrisr123
Fabian, Rong:
Thanks for the help, greatly appreciated.

I am currently using a Derby database for the append-only JDBC sink.
So far I don't see a way to use a JDBC/relational database solution for a retract/upsert use case?
Is it possible to set up JDBC sink with Derby or MySQL so that it goes back and updates or deletes/inserts previous rows and inserts new ones?
I have not been able to find example source code that does that.
Thanks again,
Chris


On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

In addition to what Rong said:

- The types look OK.
- You can also use Types.STRING, and Types.LONG instead of BasicTypeInfo.xxx
- Beware that in the failure case, you might have multiple entries in the database table. Some databases support an upsert syntax which (together with key or uniqueness constraints) can ensure that each result is added just once, even if the query recovers from a failure.

Best, Fabian

2018-07-01 17:25 GMT+02:00 Rong Rong <[hidden email]>:
Hi Chris,

Looking at the code, seems like JDBCTypeUtil [1] is used for converting Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping. However the JDBC types are different. 

Regarding the question whether your insert is correctly configured. It directly relates to how your DB executes the JDBC insert command. 
1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like you can even execute your command without type array or type mapping cannot be found, in this case the PrepareStatement will be written with plain Object type. I tired it on MySQL and it actually works pretty well. 
2. Another question is whether your underlying DB can handle "implicit type cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, so it might be a good idea to include some sanity check beforehand. 

Thanks,
Rong


On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <[hidden email]> wrote:

Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);

                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);

                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
                                "username, COUNT(url) as viewcount FROM pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

                // Dynamic Table from Continuous Query
                Table windowedTable = tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();   

                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");


                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();                       


                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();


                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");           


                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




--
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication
--Leonardo DaVinci
Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

Fabian Hueske-2
Hi Chris,

MySQL (and maybe other DBMS as well) offers special syntax for upserts.

The answers to this SO question [1] recommend "INSERT INTO ... ON DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...".
However, AFAIK this syntax is not standardized and might vary from DBMS to DBMS.

Best, Fabian


2018-07-03 12:14 GMT+02:00 Chris Ruegger <[hidden email]>:
Fabian, Rong:
Thanks for the help, greatly appreciated.

I am currently using a Derby database for the append-only JDBC sink.
So far I don't see a way to use a JDBC/relational database solution for a retract/upsert use case?
Is it possible to set up JDBC sink with Derby or MySQL so that it goes back and updates or deletes/inserts previous rows and inserts new ones?
I have not been able to find example source code that does that.
Thanks again,
Chris


On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

In addition to what Rong said:

- The types look OK.
- You can also use Types.STRING, and Types.LONG instead of BasicTypeInfo.xxx
- Beware that in the failure case, you might have multiple entries in the database table. Some databases support an upsert syntax which (together with key or uniqueness constraints) can ensure that each result is added just once, even if the query recovers from a failure.

Best, Fabian

2018-07-01 17:25 GMT+02:00 Rong Rong <[hidden email]>:
Hi Chris,

Looking at the code, seems like JDBCTypeUtil [1] is used for converting Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping. However the JDBC types are different. 

Regarding the question whether your insert is correctly configured. It directly relates to how your DB executes the JDBC insert command. 
1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like you can even execute your command without type array or type mapping cannot be found, in this case the PrepareStatement will be written with plain Object type. I tired it on MySQL and it actually works pretty well. 
2. Another question is whether your underlying DB can handle "implicit type cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, so it might be a good idea to include some sanity check beforehand. 

Thanks,
Rong


On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <[hidden email]> wrote:

Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);

                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);

                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
                                "username, COUNT(url) as viewcount FROM pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

                // Dynamic Table from Continuous Query
                Table windowedTable = tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();   

                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");


                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();                       


                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();


                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");           


                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




--
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication
--Leonardo DaVinci

Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

Fabian Hueske-2
There is also the SQL:2003 MERGE statement that can be used to implement UPSERT logic.
It is a bit verbose but supported by Derby [1].

Best, Fabian


2018-07-04 10:10 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Chris,

MySQL (and maybe other DBMS as well) offers special syntax for upserts.

The answers to this SO question [1] recommend "INSERT INTO ... ON DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...".
However, AFAIK this syntax is not standardized and might vary from DBMS to DBMS.

Best, Fabian


2018-07-03 12:14 GMT+02:00 Chris Ruegger <[hidden email]>:
Fabian, Rong:
Thanks for the help, greatly appreciated.

I am currently using a Derby database for the append-only JDBC sink.
So far I don't see a way to use a JDBC/relational database solution for a retract/upsert use case?
Is it possible to set up JDBC sink with Derby or MySQL so that it goes back and updates or deletes/inserts previous rows and inserts new ones?
I have not been able to find example source code that does that.
Thanks again,
Chris


On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

In addition to what Rong said:

- The types look OK.
- You can also use Types.STRING, and Types.LONG instead of BasicTypeInfo.xxx
- Beware that in the failure case, you might have multiple entries in the database table. Some databases support an upsert syntax which (together with key or uniqueness constraints) can ensure that each result is added just once, even if the query recovers from a failure.

Best, Fabian

2018-07-01 17:25 GMT+02:00 Rong Rong <[hidden email]>:
Hi Chris,

Looking at the code, seems like JDBCTypeUtil [1] is used for converting Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping. However the JDBC types are different. 

Regarding the question whether your insert is correctly configured. It directly relates to how your DB executes the JDBC insert command. 
1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like you can even execute your command without type array or type mapping cannot be found, in this case the PrepareStatement will be written with plain Object type. I tired it on MySQL and it actually works pretty well. 
2. Another question is whether your underlying DB can handle "implicit type cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, so it might be a good idea to include some sanity check beforehand. 

Thanks,
Rong


On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <[hidden email]> wrote:

Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);

                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);

                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
                                "username, COUNT(url) as viewcount FROM pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

                // Dynamic Table from Continuous Query
                Table windowedTable = tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();   

                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");


                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();                       


                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();


                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");           


                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




--
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication
--Leonardo DaVinci


Reply | Threaded
Open this post in threaded view
|

Re: Passing type information to JDBCAppendTableSink

Rong Rong
+1 to this answer.

MERGE is what I found most compatible syntax when dealing with upsert / replace. 

AFAIK, almost all DBMS have some kind of dialect regrading upsert functionality, so following the SQL standard might be your best solution here. 
And yes both the MERGE ingestion SQL and the execution logs are gonna be more complex.

--
Rong

On Wed, Jul 4, 2018 at 1:15 AM Fabian Hueske <[hidden email]> wrote:
There is also the SQL:2003 MERGE statement that can be used to implement UPSERT logic.
It is a bit verbose but supported by Derby [1].

Best, Fabian


2018-07-04 10:10 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Chris,

MySQL (and maybe other DBMS as well) offers special syntax for upserts.

The answers to this SO question [1] recommend "INSERT INTO ... ON DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...".
However, AFAIK this syntax is not standardized and might vary from DBMS to DBMS.

Best, Fabian


2018-07-03 12:14 GMT+02:00 Chris Ruegger <[hidden email]>:
Fabian, Rong:
Thanks for the help, greatly appreciated.

I am currently using a Derby database for the append-only JDBC sink.
So far I don't see a way to use a JDBC/relational database solution for a retract/upsert use case?
Is it possible to set up JDBC sink with Derby or MySQL so that it goes back and updates or deletes/inserts previous rows and inserts new ones?
I have not been able to find example source code that does that.
Thanks again,
Chris


On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

In addition to what Rong said:

- The types look OK.
- You can also use Types.STRING, and Types.LONG instead of BasicTypeInfo.xxx
- Beware that in the failure case, you might have multiple entries in the database table. Some databases support an upsert syntax which (together with key or uniqueness constraints) can ensure that each result is added just once, even if the query recovers from a failure.

Best, Fabian

2018-07-01 17:25 GMT+02:00 Rong Rong <[hidden email]>:
Hi Chris,

Looking at the code, seems like JDBCTypeUtil [1] is used for converting Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping. However the JDBC types are different. 

Regarding the question whether your insert is correctly configured. It directly relates to how your DB executes the JDBC insert command. 
1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like you can even execute your command without type array or type mapping cannot be found, in this case the PrepareStatement will be written with plain Object type. I tired it on MySQL and it actually works pretty well. 
2. Another question is whether your underlying DB can handle "implicit type cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, so it might be a good idea to include some sanity check beforehand. 

Thanks,
Rong


On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <[hidden email]> wrote:

Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly?


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env);

                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);

                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
                                "username, COUNT(url) as viewcount FROM pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

                // Dynamic Table from Continuous Query
                Table windowedTable = tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();   

                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");


                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();                       


                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")

.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();


                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");           


                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




--
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication
--Leonardo DaVinci