Will the order of message be guaranteed when using asyno IO?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Will the order of message be guaranteed when using asyno IO?

wanglei2@geekplus.com

Read kafka message and  write the message to MySQL
Writing to database is the bottleneck when too much message is sent to kafka with high throughput. 

So i want to change the operator to asynchronously. 

 public void asyncInvoke(ObjectNode node, ResultFuture<Integer> resultFuture) throws Exception {
    SqlProvider sqlProvider = new SqlProvider();

String tableName = node.get("metadata").get("topic").asText();
String sql = sqlProvider.getSql(node);
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return Integer.valueOf(namedTemplate.update(sql, sqlProvider.getSqlParaMap()));
}
}).thenAccept((Integer res) -> {
resultFuture.complete(Collections.singletonList(res));
});

}

I want to know if there will be order issues. 
For example the kafaka message order is:
First:          update table table1 set a = 1
Second:     update table table1 set a = 2

It is possible that the final value in database is 1 instead of 2.

Thanks
Lei

Reply | Threaded
Open this post in threaded view
|

Re: Will the order of message be guaranteed when using asyno IO?

Till Rohrmann
Hi Lei,

this really depends on your user code. Flink will give you the guarantee that `asyncInvoke` is called in order (first `update table table1 set a = 1` and then `update table table1 set a = 2`). However, what `CompletableFuture.supplyAsync` does is not under control of Flink. Concretely, it will use the ForkJoinPool#commonPool() to execute the update method on your table. I believe that this executor won't give you any order guarantees wrt the submitted tasks. Hence, it should be able that you see 1 in your database instead of 2.

Cheers,
Till

On Wed, Aug 19, 2020 at 8:25 AM [hidden email] <[hidden email]> wrote:

Read kafka message and  write the message to MySQL
Writing to database is the bottleneck when too much message is sent to kafka with high throughput. 

So i want to change the operator to asynchronously. 

 public void asyncInvoke(ObjectNode node, ResultFuture<Integer> resultFuture) throws Exception {
    SqlProvider sqlProvider = new SqlProvider();

String tableName = node.get("metadata").get("topic").asText();
String sql = sqlProvider.getSql(node);
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return Integer.valueOf(namedTemplate.update(sql, sqlProvider.getSqlParaMap()));
}
}).thenAccept((Integer res) -> {
resultFuture.complete(Collections.singletonList(res));
});

}

I want to know if there will be order issues. 
For example the kafaka message order is:
First:          update table table1 set a = 1
Second:     update table table1 set a = 2

It is possible that the final value in database is 1 instead of 2.

Thanks
Lei

Reply | Threaded
Open this post in threaded view
|

Re: Will the order of message be guaranteed when using asyno IO?

Arvid Heise-3
In reply to this post by wanglei2@geekplus.com
Hi Lei,

there is no guarantee in the order of async tasks with asyncIO. It is the very nature of async programming that the tasks are potentially run in different threads and thus may end up overtaking each other.

With ordered async I/O, you only get the guarantee that results are fed back into Flink in the order the tasks were submitted. So if you use asyncIO to enrich your kafka records with SQL, the enriched kafka records stay in the same order as the initial kafka records.

If you absolutely want to try out async IO, then you should use optimistic locking techniques with some record versioning. For example, if you repeat the updates until they succeed, you could extend your example as follows:

update table table1 set a = 1, version = 2 where version = 1
update table table1 set a = 2, version = 3 where version = 2

The version number could be kept in the async Function (if you keyed by table) and added to your template.

However, I'd still recommend reviewing the architecture and maybe use a different sink, such as a key-value store, or try out a distributed SQL store instead.



On Wed, Aug 19, 2020 at 8:25 AM [hidden email] <[hidden email]> wrote:

Read kafka message and  write the message to MySQL
Writing to database is the bottleneck when too much message is sent to kafka with high throughput. 

So i want to change the operator to asynchronously. 

 public void asyncInvoke(ObjectNode node, ResultFuture<Integer> resultFuture) throws Exception {
    SqlProvider sqlProvider = new SqlProvider();

String tableName = node.get("metadata").get("topic").asText();
String sql = sqlProvider.getSql(node);
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return Integer.valueOf(namedTemplate.update(sql, sqlProvider.getSqlParaMap()));
}
}).thenAccept((Integer res) -> {
resultFuture.complete(Collections.singletonList(res));
});

}

I want to know if there will be order issues. 
For example the kafaka message order is:
First:          update table table1 set a = 1
Second:     update table table1 set a = 2

It is possible that the final value in database is 1 instead of 2.

Thanks
Lei



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng