private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private transient ExecutorService executorService;
private transient Connection dbConn;
private transient PreparedStatement preparedStatement;
SampleAsyncFunction(<connection info>) {
this.<connection info > = <connection info>;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
dbConn = DriverManager.getConnection( < connection info >);
preparedStatement = dbConn.prepareStatement("SELECT * FROM WHERE ...");
}
@Override
public void close() throws Exception {
super.close();
executorService.shutdownNow();
preparedStatement.close();
dbConn.close();
}
@Override
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
try {
preparedStatement.setInt(0, input);
final ResultSet resultSet = preparedStatement.executeQuery();
resultFuture.complete(Arrays.asList(resultSet.getString(0)));
} catch (SQLException e) {
resultFuture.completeExceptionally(e);
}
});
}
}
Hi,
I do believe that example from [1] where you see DatabaseClient is just a
hint that whatever library you would use (db or REST based or whatever else)
should be asynchronous or should actually not block. It does not have to be
non blocking until it runs on its own thread pool that will return a feature
or somewhat allowing you to register resultFuture.complete(...) on that
future.
I actually write my own semi library that registers onto
resultFuture.complete(...) from each library thread.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Arvid Heise | Senior Java Developer
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbHFree forum by Nabble | Edit this page |