Flink Async io problem

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Async io problem

venn

Hi Flink experts,

            I’m working flink async io program for stream join outer database(mysql)but found syncplease give some advice, or provide some async demo. thanks

 

asyncInvoke method are as follow:

@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {
   
// 使用 asyncUser id 查询
   
ps.setString(1, asyncUser.getId());
   
ResultSet rs = ps.executeQuery();

   
CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
       
@Override
       
public AsyncUser get() {
           
try {
               
if (!rs.isClosed() && rs.next()) {
                   
asyncUser.setPhone(rs.getString(1));
               
}
            }
catch (SQLException e) {
                e.printStackTrace()
;
           
}
           
return asyncUser;
       
}
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list =
new ArrayList();
       
list.add(tmp);
       
resultFuture.complete(list);
   
});
}

 

 

 

Best, Venn

Reply | Threaded
Open this post in threaded view
|

Re: Flink Async io problem

Yun Gao
Hi Venn,

     I think `AsyncFunction#asyncInvoke` should be used to submit asynchronous tasks for the input records instead of executing the tasks directly. However, it seems that in the code fragment, the query is executed directly in the asyncInvoke method.

    I think you may also find more information in the document page [1]. A point might need to be noted is that in the example of the document page, the call to the `client#query` returns a Future, thus is is an asynchronous action instead of executing the query directly.

Best,
Yun

------------------------------------------------------------------
From:venn <[hidden email]>
Send Time:2019 Jul. 9 (Tue.) 19:54
To:user <[hidden email]>
Subject:Flink Async io problem

Hi Flink experts,

            I’m working flink async io program for stream join outer database(mysql)but found syncplease give some advice, or provide some async demo. thanks

 

asyncInvoke method are as follow:

@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {
   
// 使用 asyncUser id 查询
   
ps.setString(1, asyncUser.getId());
   
ResultSet rs = ps.executeQuery();

   
CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
       
@Override
       
public AsyncUser get() {
           
try {
               
if (!rs.isClosed() && rs.next()) {
                   
asyncUser.setPhone(rs.getString(1));
               
}
            }
catch (SQLException e) {
                e.printStackTrace()
;
           
}
           
return asyncUser;
       
}
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list =
new ArrayList();
       
list.add(tmp);
       
resultFuture.complete(list);
   
});
}

 

 

 

Best, Venn


Reply | Threaded
Open this post in threaded view
|

Re: Flink Async io problem

Yun Gao
In reply to this post by venn

Hi Venn,

     I think `AsyncFunction#asyncInvoke` should be used to submit asynchronous tasks for the input records instead of executing the tasks directly. However, it seems that in the code fragment, the query is executed directly in the asyncInvoke method.

    I think you may also find more information in the document page [1]. A point might need to be noted is that in the example of the document page, the call to the `client#query` returns a Future, thus is is an asynchronous action instead of executing the query directly.

Best,
Yun



------------------------------------------------------------------
From:venn <[hidden email]>
Send Time:2019 Jul. 9 (Tue.) 19:54
To:user <[hidden email]>
Subject:Flink Async io problem

Hi Flink experts,

            I’m working flink async io program for stream join outer database(mysql)but found syncplease give some advice, or provide some async demo. thanks

 

asyncInvoke method are as follow:

@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {
   
// 使用 asyncUser id 查询
   
ps.setString(1, asyncUser.getId());
   
ResultSet rs = ps.executeQuery();

   
CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
       
@Override
       
public AsyncUser get() {
           
try {
               
if (!rs.isClosed() && rs.next()) {
                   
asyncUser.setPhone(rs.getString(1));
               
}
            }
catch (SQLException e) {
                e.printStackTrace()
;
           
}
           
return asyncUser;
       
}
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list =
new ArrayList();
       
list.add(tmp);
       
resultFuture.complete(list);
   
});
}

 

 

 

Best, Venn