PotsgreSQL JDBC Sink, "writeRecord() failed" and "Batch element cancelled" on upsert

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PotsgreSQL JDBC Sink, "writeRecord() failed" and "Batch element cancelled" on upsert

Benoît Hanotte
Hi,

I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink for  
my data.
It works perfectly when performing an INSERT with my data, but when trying  
to perform an UPSERT it fails with the following errors:

15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code:  
writeRecord() failed:  
DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4)
java.lang.IllegalArgumentException: writeRecord() failed
        at  
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:132)
        at  
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
        at  
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:173)
        at  
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.sql.BatchUpdateException: L'élément du batch 0 SELECT  
upsert_nsp_db(-42.48112678527832,-73.76546859741211,8.0,0.0) a été annulé.  
Appeler getNextException pour en connaître la cause.
        at  
org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2743)
        at  
org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleResultRows(AbstractJdbc2Statement.java:2692)
        at  
org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleResultRows(QueryExecutorImpl.java:333)
        at  
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1853)
        at  
org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1130)
        at  
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:396)
        at  
org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2892)
        at  
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:127)
        ... 4 more
15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput  
format.:  
DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4)
java.lang.IllegalArgumentException: close() failed
        at  
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputFormat.java:188)
        at  
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202)
        at  
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
        at java.lang.Thread.run(Thread.java:744)
Caused by: org.postgresql.util.PSQLException: Ce statement a été fermé.  
(Translates into This statement has been closed)
        at  
org.postgresql.jdbc2.AbstractJdbc2Statement.checkClosed(AbstractJdbc2Statement.java:2634)
        at  
org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2832)
        at  
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputFormat.java:185)
        ... 3 more



My java code is


var.output(
        // build and configure OutputFormat
        JDBCOutputFormat
                .buildJDBCOutputFormat()
                .setDrivername("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test")
                .setUsername("postgres")
                .setPassword("")
                .setQuery("SELECT upsert_nsp_db(?,?,?);") // lat, lng, val
                .finish()
        );



My upsert function is the following:


CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m  
FLOAT) RETURNS VOID AS
$$
BEGIN
     LOOP
         -- first try to update the key
         UPDATE nsp_db SET mean = m WHERE ST_Equals(location,  
ST_SetSRID(ST_MakePoint(lng,lat),26918));
         IF found THEN
             RETURN;
         END IF;
         -- not there, so try to insert the key
         -- if someone else inserts the same key concurrently,
         -- we could get a unique-key failure
         BEGIN
                        INSERT INTO nsp_db (location, mean) VALUES  
(ST_SetSRID(ST_MakePoint(lng,lat),26918), m);
             RETURN;
         EXCEPTION WHEN unique_violation THEN
             -- do nothing, and loop to try the UPDATE again
         END;
     END LOOP;
END;
$$
LANGUAGE plpgsql;


I suspect it is some kind of timeout, but is there any way I could solve  
this issue?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: PotsgreSQL JDBC Sink, "writeRecord() failed" and "Batch element cancelled" on upsert

rmetzger0
Hi,

sorry for the late reply. I'm not an postgresSql expert, but I try my best.  Also, I'm not aware of many users of the JDBC output format, so I guess its quite likely that there are open issues with it.

The exceptions have been thrown by the PostgreSQL JDBC driver.
As far as I can see, there are two exceptions, one in the "writeRecord()" method call and one in the close() call. I think we can ignore the exception in the close() call because its somewhat expected that we can not execute further queries when parts of the batch have failed.
I suspect that the batched inserts performed by the OutputFormat are not compatible with upsert function. (Have a look at the code to see how we do batched inserts: https://github.com/apache/flink/blob/b904b0041cf97b2c6181b1985afc457ed01cf626/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java)

I would recommend you to prototype a new OutputFormat which is performing regular queries to see if this is working with upserts.

Robert





On Wed, Jan 14, 2015 at 1:16 PM, Benoît Hanotte <[hidden email]> wrote:
Hi,

I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink for my data.
It works perfectly when performing an INSERT with my data, but when trying to perform an UPSERT it fails with the following errors:

15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code: writeRecord() failed:  DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4)
java.lang.IllegalArgumentException: writeRecord() failed
        at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:132)
        at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
        at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:173)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.sql.BatchUpdateException: L'élément du batch 0 SELECT upsert_nsp_db(-42.48112678527832,-73.76546859741211,8.0,0.0) a été annulé. Appeler getNextException pour en connaître la cause.
        at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2743)
        at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleResultRows(AbstractJdbc2Statement.java:2692)
        at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleResultRows(QueryExecutorImpl.java:333)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1853)
        at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1130)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:396)
        at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2892)
        at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:127)
        ... 4 more
15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput format.:  DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4)
java.lang.IllegalArgumentException: close() failed
        at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputFormat.java:188)
        at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
        at java.lang.Thread.run(Thread.java:744)
Caused by: org.postgresql.util.PSQLException: Ce statement a été fermé. (Translates into This statement has been closed)
        at org.postgresql.jdbc2.AbstractJdbc2Statement.checkClosed(AbstractJdbc2Statement.java:2634)
        at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2832)
        at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputFormat.java:185)
        ... 3 more



My java code is


var.output(
        // build and configure OutputFormat
        JDBCOutputFormat
                .buildJDBCOutputFormat()
                .setDrivername("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test")
                .setUsername("postgres")
                .setPassword("")
                .setQuery("SELECT upsert_nsp_db(?,?,?);") // lat, lng, val
                .finish()
        );



My upsert function is the following:


CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m FLOAT) RETURNS VOID AS
$$
BEGIN
    LOOP
        -- first try to update the key
        UPDATE nsp_db SET mean = m WHERE ST_Equals(location, ST_SetSRID(ST_MakePoint(lng,lat),26918));
        IF found THEN
            RETURN;
        END IF;
        -- not there, so try to insert the key
        -- if someone else inserts the same key concurrently,
        -- we could get a unique-key failure
        BEGIN
                        INSERT INTO nsp_db (location, mean) VALUES (ST_SetSRID(ST_MakePoint(lng,lat),26918), m);
            RETURN;
        EXCEPTION WHEN unique_violation THEN
            -- do nothing, and loop to try the UPDATE again
        END;
    END LOOP;
END;
$$
LANGUAGE plpgsql;


I suspect it is some kind of timeout, but is there any way I could solve this issue?

Thanks!