How to set unorderedWait/orderedWait properties in Table API when using Async I/O

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to set unorderedWait/orderedWait properties in Table API when using Async I/O

StevenZheng
Hi all,
I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource class implements LookupableTableSource, but when I override the getAsyncLookupFunction(), I found the results of async method(by Vertx) is in order.

But I don't need the stream order is preserved and just want the result records emitted out of order to improve processing speed. In DataStream API I can easily set the result order guarantee(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results), and my question is,

how to set this in Table API or SQL API with Blink Planner?

Thanks. Regards
Reply | Threaded
Open this post in threaded view
|

Fwd: How to set unorderedWait/orderedWait properties in Table API when using Async I/O

StevenZheng


---------- Forwarded message ---------
发件人: StevenZheng <[hidden email]>
Date: 2020年2月28日周五 下午6:30
Subject: Re: How to set unorderedWait/orderedWait properties in Table API when using Async I/O
To: Danny Chan <[hidden email]>


Thanks Danny and I do run my lookupfunction in a single thread like this commit:https://github.com/apache/flink/pull/10356, and my customized source is a jdbc table source.

But actually I still want to know, how to define the return order of async results and if it is possible to do that.

Danny Chan <[hidden email]> 于2020年2月27日周四 下午9:38写道:
The lookup event is indeed triggered by the AsyncWaitOperator, the blink AsyncLookupJoinRunner is nested into that.
But we only generates the AsyncWaitOperator when the LookupableTableSource#isAsyncEnabled returns true, now only InMemoryLookupableTableSource supports that.

One thing need to note is that you should execute the logic in LookupableTableSource with a separate thread if your source is custom.

So
  1. What dimension table source do you use ?
  2. If you customized you source, did you run it in a separate thread ?

Best,
Danny Chan
在 2020年2月26日 +0800 PM9:14,郑泽辉 <[hidden email]>,写道:
Hi all,
I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource class implements LookupableTableSource, but when I override the getAsyncLookupFunction(), I found the results of async method(by Vertx) is in order.

But I don't need the stream order is preserved and just want the result records emitted out of order to improve processing speed. In DataStream API I can easily set the result order guarantee(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results), and my question is,

how to set this in Table API or SQL API with Blink Planner?

Thanks. Regards
Reply | Threaded
Open this post in threaded view
|

Re: How to set unorderedWait/orderedWait properties in Table API when using Async I/O

Jark Wu-3
Hi,

The ordering in streaming SQL is very important, because the accumulate and retract messages are emitted in order. 
If messages are out of order, the result will be wrong. Think of you are applying an un-ordered changelog, the result will be non-deterministic. 
That's why we only support "ordered" mode for async lookup join. 

The support for "un-ordered" mode is on the roadmap, but that will be more complex, because the planner should check it doesn't affect 
the order of acc/retract messages (e.g. it is just an append-only stream). 

I created https://issues.apache.org/jira/browse/FLINK-16332 to track this feature. 

Best,
Jark


On Fri, 28 Feb 2020 at 18:33, 郑泽辉 <[hidden email]> wrote:


---------- Forwarded message ---------
发件人: StevenZheng <[hidden email]>
Date: 2020年2月28日周五 下午6:30
Subject: Re: How to set unorderedWait/orderedWait properties in Table API when using Async I/O
To: Danny Chan <[hidden email]>


Thanks Danny and I do run my lookupfunction in a single thread like this commit:https://github.com/apache/flink/pull/10356, and my customized source is a jdbc table source.

But actually I still want to know, how to define the return order of async results and if it is possible to do that.

Danny Chan <[hidden email]> 于2020年2月27日周四 下午9:38写道:
The lookup event is indeed triggered by the AsyncWaitOperator, the blink AsyncLookupJoinRunner is nested into that.
But we only generates the AsyncWaitOperator when the LookupableTableSource#isAsyncEnabled returns true, now only InMemoryLookupableTableSource supports that.

One thing need to note is that you should execute the logic in LookupableTableSource with a separate thread if your source is custom.

So
  1. What dimension table source do you use ?
  2. If you customized you source, did you run it in a separate thread ?

Best,
Danny Chan
在 2020年2月26日 +0800 PM9:14,郑泽辉 <[hidden email]>,写道:
Hi all,
I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource class implements LookupableTableSource, but when I override the getAsyncLookupFunction(), I found the results of async method(by Vertx) is in order.

But I don't need the stream order is preserved and just want the result records emitted out of order to improve processing speed. In DataStream API I can easily set the result order guarantee(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results), and my question is,

how to set this in Table API or SQL API with Blink Planner?

Thanks. Regards
Reply | Threaded
Open this post in threaded view
|

Re: How to set unorderedWait/orderedWait properties in Table API when using Async I/O

StevenZheng
Thanks Jark and the un-ordered mode is useful in some cases.

Jark Wu <[hidden email]> 于2020年2月28日周五 下午7:18写道:
Hi,

The ordering in streaming SQL is very important, because the accumulate and retract messages are emitted in order. 
If messages are out of order, the result will be wrong. Think of you are applying an un-ordered changelog, the result will be non-deterministic. 
That's why we only support "ordered" mode for async lookup join. 

The support for "un-ordered" mode is on the roadmap, but that will be more complex, because the planner should check it doesn't affect 
the order of acc/retract messages (e.g. it is just an append-only stream). 

I created https://issues.apache.org/jira/browse/FLINK-16332 to track this feature. 

Best,
Jark


On Fri, 28 Feb 2020 at 18:33, 郑泽辉 <[hidden email]> wrote:


---------- Forwarded message ---------
发件人: StevenZheng <[hidden email]>
Date: 2020年2月28日周五 下午6:30
Subject: Re: How to set unorderedWait/orderedWait properties in Table API when using Async I/O
To: Danny Chan <[hidden email]>


Thanks Danny and I do run my lookupfunction in a single thread like this commit:https://github.com/apache/flink/pull/10356, and my customized source is a jdbc table source.

But actually I still want to know, how to define the return order of async results and if it is possible to do that.

Danny Chan <[hidden email]> 于2020年2月27日周四 下午9:38写道:
The lookup event is indeed triggered by the AsyncWaitOperator, the blink AsyncLookupJoinRunner is nested into that.
But we only generates the AsyncWaitOperator when the LookupableTableSource#isAsyncEnabled returns true, now only InMemoryLookupableTableSource supports that.

One thing need to note is that you should execute the logic in LookupableTableSource with a separate thread if your source is custom.

So
  1. What dimension table source do you use ?
  2. If you customized you source, did you run it in a separate thread ?

Best,
Danny Chan
在 2020年2月26日 +0800 PM9:14,郑泽辉 <[hidden email]>,写道:
Hi all,
I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource class implements LookupableTableSource, but when I override the getAsyncLookupFunction(), I found the results of async method(by Vertx) is in order.

But I don't need the stream order is preserved and just want the result records emitted out of order to improve processing speed. In DataStream API I can easily set the result order guarantee(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results), and my question is,

how to set this in Table API or SQL API with Blink Planner?

Thanks. Regards