Login  Register

Re: Should I use static database connection pool?

Posted by John Smith on Oct 16, 2019; 6:31pm
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Should-I-use-static-database-connection-pool-tp30537p30548.html

Xin. The open() close() cycle of a Sink function is only called once so I don't think you event need to have it static your pool. Someone can confirm this?

Miki the JDBC Connector lacks some functionality for instance it only flushes batches when the batch interval is reached. So if you set batch interval to 5 and you get 6 records the 6 one will not be flushed to the DB until you get another 4. You can see in the code above Xin has put a timer based flush as well. Also JDBC connector does not have checkpointing if you ever need that, which is a surprise because most JDBC databases have transactions so it would be nice to have.

On Wed, 16 Oct 2019 at 10:58, miki haiat <[hidden email]> wrote:
If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
 

On Wed, Oct 16, 2019, 17:03 Xin Ma <[hidden email]> wrote:
I have watched one of the recent Flink forward videos, Apache Flink Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions that we should avoid using static variables to share state between tasks.

So should I also avoid static database connection? Because I am facing a weird issue currently, the database connection will lose at some point and bring the whole job down.

I have created a database tool like this, 

public class Phoenix {

    private static ComboPooledDataSource dataSource = new ComboPooledDataSource();
    static {
        try {
            dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName", "org.apache.phoenix.jdbc.PhoenixDriver"));
            dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url", null));
            dataSource.setMaxPoolSize(200);
            dataSource.setMinPoolSize(10);
            Properties properties = new Properties();
            properties.setProperty("user", "---");
            properties.setProperty("password", "---");
            dataSource.setProperties(properties);
        } catch (PropertyVetoException e) {
            throw new RuntimeException("phoenix datasource conf error");
        }
    }

    private static Connection getConn() throws SQLException {
        return dataSource.getConnection();
    }

    public static < T > T executeQuery(String sql, Caller < T > caller) throws SQLException {
        // .. execiton logic
    }

    public static int executeUpdateWithTx(List < String > sqlList) throws SQLException {
        // ..update logic
    }

}

Then I implemented my customized sink function like this,

public class CustomizedSink extends RichSinkFunction < Record > {

    private static Logger LOG = LoggerFactory.getLogger("userFlinkLogger");
    private static final int batchInsertSize = 5000;
    private static final long flushInterval = 60 * 1000 L;
    private long lastFlushTime;
    private BatchCommit batchCommit;
    private ConcurrentLinkedQueue < Object > cacheQueue;
    private ExecutorService threadPool;

    @Override
    public void open(Configuration parameters) throws Exception {
        cacheQueue = new ConcurrentLinkedQueue < > ();
        threadPool = Executors.newFixedThreadPool(1);
        batchCommit = new BatchCommit();
        super.open(parameters);
    }

    @Override
    public void invoke(DriverLbs driverLbs) throws Exception {
        cacheQueue.add(driverLbs);
        if (cacheQueue.size() >= batchInsertSize ||
            System.currentTimeMillis() - lastFlushTime >= flushInterval) {
            lastFlushTime = System.currentTimeMillis();
            threadPool.execute(batchCommit);
        }
    }

    private class BatchCommit implements Runnable {
        @Override
        public void run() {
            try {
                int ct;
                synchronized(cacheQueue) {
                    List < String > sqlList = Lists.newArrayList();
                    for (ct = 0; ct < batchInsertSize; ct++) {
                        Object obj = cacheQueue.poll();
                        if (obj == null) {
                            break;
                        }
                        sqlList.add(generateBatchSql((Record) obj));
                    }
                    Phoenix.executeUpdateWithTx(sqlList);
                }
                LOG.info("Batch insert " + ct + " cache records");
            } catch (Exception e) {
                LOG.error("Batch insert error: ", e);
            }
        }

        private String generateBatchSql(Record record) {
            // business logic
        }
    }
}

Is there any good idea to refactor the codes?

Best,
Kevin