Read kafka message and write the message to MySQL Writing to database is the bottleneck when too much message is sent to kafka with high throughput. So i want to change the operator to asynchronously. public void asyncInvoke(ObjectNode node, ResultFuture<Integer> resultFuture) throws Exception { SqlProvider sqlProvider = new SqlProvider(); I want to know if there will be order issues. For example the kafaka message order is: First: update table table1 set a = 1
Second: update table table1 set a = 2 It is possible that the final value in database is 1 instead of 2. Thanks Lei |
Hi Lei, this really depends on your user code. Flink will give you the guarantee that `asyncInvoke` is called in order (first `update table table1 set a = 1` and then `update table table1 set a = 2`). However, what `CompletableFuture.supplyAsync` does is not under control of Flink. Concretely, it will use the ForkJoinPool#commonPool() to execute the update method on your table. I believe that this executor won't give you any order guarantees wrt the submitted tasks. Hence, it should be able that you see 1 in your database instead of 2. Cheers, Till
|
In reply to this post by wanglei2@geekplus.com
Hi Lei, there is no guarantee in the order of async tasks with asyncIO. It is the very nature of async programming that the tasks are potentially run in different threads and thus may end up overtaking each other. With ordered async I/O, you only get the guarantee that results are fed back into Flink in the order the tasks were submitted. So if you use asyncIO to enrich your kafka records with SQL, the enriched kafka records stay in the same order as the initial kafka records. If you absolutely want to try out async IO, then you should use optimistic locking techniques with some record versioning. For example, if you repeat the updates until they succeed, you could extend your example as follows: update table table1 set a = 1, version = 2 where version = 1 update table table1 set a = 2, version = 3 where version = 2 The version number could be kept in the async Function (if you keyed by table) and added to your template. However, I'd still recommend reviewing the architecture and maybe use a different sink, such as a key-value store, or try out a distributed SQL store instead.
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |