Re: What async database library does the asyncio code example use?
Posted by
Marco Villalobos-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/What-async-database-library-does-the-asyncio-code-example-use-tp37370p37406.html
Thank you!
This was very helpful.
Sincerely,
Marco A. Villalobos
Hi Marco,
you don't need to use an async library; you could simply write your code in async fashion.
I'm trying to sketch the basic idea using any JDBC driver in the following (it's been a while since I used JDBC, so don't take it too literally).
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private transient ExecutorService executorService;
private transient Connection dbConn;
private transient PreparedStatement preparedStatement;
SampleAsyncFunction(<connection info>) {
this.<connection info > = <connection info>;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
dbConn = DriverManager.getConnection( < connection info >);
preparedStatement = dbConn.prepareStatement("SELECT * FROM WHERE ...");
}
@Override
public void close() throws Exception {
super.close();
executorService.shutdownNow();
preparedStatement.close();
dbConn.close();
}
@Override
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
try {
preparedStatement.setInt(0, input);
final ResultSet resultSet = preparedStatement.executeQuery();
resultFuture.complete(Arrays.asList(resultSet.getString(0)));
} catch (SQLException e) {
resultFuture.completeExceptionally(e);
}
});
}
}
That's basically what all async libraries are doing behind the scenes anyways: spawn a thread pool and call the callbacks when a submitted task finishes.
To decide on the size of the thread pool, you should do some measurements without Flink on how many queries you can execute in parallel. Also keep in mind that if your async IO is run in parallel on the same task manager, that your threads will multiply (you can also use a static, shared executor, but it's a bit tricky to shutdown).
--
Arvid Heise | Senior Java Developer
Follow us @VervericaData
--
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng