Read kafka message and write the message to MySQL
Writing to database is the bottleneck when too much message is sent to kafka with high throughput.
So i want to change the operator to asynchronously.
public void asyncInvoke(ObjectNode node, ResultFuture<Integer> resultFuture) throws Exception {
SqlProvider sqlProvider = new SqlProvider();
String tableName = node.get("metadata").get("topic").asText();
String sql = sqlProvider.getSql(node);
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return Integer.valueOf(namedTemplate.update(sql, sqlProvider.getSqlParaMap()));
}
}).thenAccept((Integer res) -> {
resultFuture.complete(Collections.singletonList(res));
});
}
I want to know if there will be order issues.
For example the kafaka message order is:
First: update table table1 set a = 1
Second: update table table1 set a = 2
It is possible that the final value in database is 1 instead of 2.
Thanks
Lei