This post was updated on .
I'm trying to determine if I'm specifying type information properly when
doing an INSERT using the JDBCAppendTableSink API. Specifically, how do I specify timestamp and date types? It looks like I need to use Type.SQL_TIMESTAMP for a timestamp but BasicTypeInfo for types like varchar, etc? I am having trouble finding complete examples. I got this to work below but I wanted to confirm I'm doing things the correct way? This is for an append-only into a Derby Database table. My DDL # simple table with a timestamp, varchar, bigint create table mydb.pageview_counts (window_end timestamp not null, username varchar(40) not null, viewcount bigint not null); My Insert Statement // Write Result Table to Sink // Configure Sink JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder() .setDrivername("org.apache.derby.jdbc.ClientDriver") .setDBUrl("jdbc:derby://myhost:1527/mydb") .setUsername("foo") .setPassword("bar") .setBatchSize(1) .setQuery("INSERT INTO mydb.pageview_counts (window_end,username,viewcount) VALUES (?,?,?)") .setParameterTypes(Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO) .build(); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
can you share the full code . On Sun, Jul 1, 2018 at 12:49 PM chrisr123 <[hidden email]> wrote:
|
Full Source except for mapper and timestamp assigner. Sample Input Stream record: 1530447316589,Mary,./home What are the correct parameters to pass for data types in the JDBCAppendTableSink? Am I doing this correctly? // Get Execution Environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env); // Get and Set execution parameters. ParameterTool parms = ParameterTool.fromArgs(args); env.getConfig().setGlobalJobParameters(parms); // Configure Checkpoint and Restart // configureCheckpoint(env); // configureRestart(env); // Get Our Data Stream DataStream<Tuple3<Long,String,String>> eventStream = env .socketTextStream(parms.get("host"), parms.getInt("port")) .map(new TableStreamMapper()) .assignTimestampsAndWatermarks(new MyEventTimestampAssigner()); // Register Table // Dynamic Table From Stream tableEnvironment.registerDataStream("pageViews", eventStream, "pageViewTime.rowtime, username, url"); // Continuous Query String continuousQuery = "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " + "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " + "username, COUNT(url) as viewcount FROM pageViews " + "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username"; // Dynamic Table from Continuous Query Table windowedTable = tableEnvironment.sqlQuery(continuousQuery); windowedTable.printSchema(); // Convert Results to DataStream Table resultTable = windowedTable .select("wstart, wend, username,viewcount"); TupleTypeInfo<Tuple4<Timestamp,Timestamp,String,Long>> tupleTypeInfo = new TupleTypeInfo<>( Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.STRING, Types.LONG); DataStream<Tuple4<Timestamp,Timestamp,String,Long>> resultDataStream = tableEnvironment.toAppendStream(resultTable,tupleTypeInfo); resultDataStream.print(); // Write Result Table to Sink // Configure Sink JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder() .setDrivername("org.apache.derby.jdbc.ClientDriver") .setDBUrl("jdbc:derby://captain:1527/rueggerllc") .setUsername("chris") .setPassword("xxxx") .setBatchSize(1) .setQuery("INSERT INTO chris.pageclicks (window_start,window_end,username,viewcount) VALUES (?,?,?,?)") .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO) .build(); // Write Result Table to Sink resultTable.writeToSink(pageViewSink); System.out.println("WRITE TO SINK"); // Execute env.execute("PageViewsTumble"); } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Chris, Looking at the code, seems like JDBCTypeUtil [1] is used for converting Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and SQL_TIME are both listed in the conversion mapping. However the JDBC types are different. Regarding the question whether your insert is correctly configured. It directly relates to how your DB executes the JDBC insert command. 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems like you can even execute your command without type array or type mapping cannot be found, in this case the PrepareStatement will be written with plain Object type. I tired it on MySQL and it actually works pretty well. 2. Another question is whether your underlying DB can handle "implicit type cast": For example, inserting an INTEGER type into a BIGINT column. AFAIK JDBCAppendableSink does not check compatibilities before writeRecord, so it might be a good idea to include some sanity check beforehand. Thanks, Rong On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <[hidden email]> wrote:
|
Hi, In addition to what Rong said: - The types look OK. - You can also use Types.STRING, and Types.LONG instead of BasicTypeInfo.xxx - Beware that in the failure case, you might have multiple entries in the database table. Some databases support an upsert syntax which (together with key or uniqueness constraints) can ensure that each result is added just once, even if the query recovers from a failure. Best, Fabian 2018-07-01 17:25 GMT+02:00 Rong Rong <[hidden email]>:
|
Fabian, Rong: Thanks for the help, greatly appreciated. I am currently using a Derby database for the append-only JDBC sink. So far I don't see a way to use a JDBC/relational database solution for a retract/upsert use case? Is it possible to set up JDBC sink with Derby or MySQL so that it goes back and updates or deletes/inserts previous rows and inserts new ones? I have not been able to find example source code that does that. Thanks again, Chris On Tue, Jul 3, 2018 at 5:24 AM, Fabian Hueske <[hidden email]> wrote:
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication --Leonardo DaVinci |
Hi Chris, MySQL (and maybe other DBMS as well) offers special syntax for upserts. The answers to this SO question [1] recommend "INSERT INTO ... ON DUPLICATE KEY UPDATE ..." or "REPLACE INTO ...". However, AFAIK this syntax is not standardized and might vary from DBMS to DBMS. Best, Fabian 2018-07-03 12:14 GMT+02:00 Chris Ruegger <[hidden email]>:
|
There is also the SQL:2003 MERGE statement that can be used to implement UPSERT logic. It is a bit verbose but supported by Derby [1]. Best, Fabian 2018-07-04 10:10 GMT+02:00 Fabian Hueske <[hidden email]>:
|
+1 to this answer. MERGE is what I found most compatible syntax when dealing with upsert / replace. AFAIK, almost all DBMS have some kind of dialect regrading upsert functionality, so following the SQL standard might be your best solution here. And yes both the MERGE ingestion SQL and the execution logs are gonna be more complex. -- Rong On Wed, Jul 4, 2018 at 1:15 AM Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |