async io parallelism

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

async io parallelism

Alexey Trenikhun
Hello,
Let's say, my elements are simple key-value pairs, elements are coming from Kafka, where they were partitioned by "key", then I do processing using KeyedProcessFunction (keyed by same "key"), then I enrich elements using ordered RichAsyncFunction, then output to another KeyedProcessFunction (keyed by same "key") and then write to Kafka topic, again partitioned by same "key", something like this:

FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) -> AsyncDataStream.orderedWait() -> keyBy("key")->Output(KeyedProcessFunction)->FlinkKafkaProducer

Will it preserve order of events with same "key"? 
  • Will Output function receive elements with same "key" in same order as they were originally in Kafka?
  • Will FlinkKafkaProducer writes elements with same "key" in same order as they were originally in Kafka?
  • Does it depend on parallelism of async IO? Documentation says "the stream order is preserved", but if there are multiple parallel instances of async function, does it mean order relative to each single instance? Or total stream order?
Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: async io parallelism

Arvid Heise-3
Hi Alexey,

the short answer is: order is preserved in all cases.

Basically, ordered asyncIO maintains an internal FIFO queue where all pending elements reside. All async results are saved into this queue, but elements will only be outputted when the head element has a result.

So assume you have three input records i1, i2, i3 and get the outputs asynchronously in the order o2, o1, o3 after 100 ms each, then there is no output after receiving o2, then o1 and o2 are outputted after 200 ms, and then o3 after 300 ms.

Best,

Arvid

On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
Let's say, my elements are simple key-value pairs, elements are coming from Kafka, where they were partitioned by "key", then I do processing using KeyedProcessFunction (keyed by same "key"), then I enrich elements using ordered RichAsyncFunction, then output to another KeyedProcessFunction (keyed by same "key") and then write to Kafka topic, again partitioned by same "key", something like this:

FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) -> AsyncDataStream.orderedWait() -> keyBy("key")->Output(KeyedProcessFunction)->FlinkKafkaProducer

Will it preserve order of events with same "key"? 
  • Will Output function receive elements with same "key" in same order as they were originally in Kafka?
  • Will FlinkKafkaProducer writes elements with same "key" in same order as they were originally in Kafka?
  • Does it depend on parallelism of async IO? Documentation says "the stream order is preserved", but if there are multiple parallel instances of async function, does it mean order relative to each single instance? Or total stream order?
Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: async io parallelism

Alexey Trenikhun
Arvid, thank you.
So there is single instance of FIFO per async IO operator regardless of parallelism of the async IO operator?
Thanks,
Alexey


From: Arvid Heise <[hidden email]>
Sent: Saturday, February 22, 2020 1:23:01 PM
To: Alexey Trenikhun <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: async io parallelism
 
Hi Alexey,

the short answer is: order is preserved in all cases.

Basically, ordered asyncIO maintains an internal FIFO queue where all pending elements reside. All async results are saved into this queue, but elements will only be outputted when the head element has a result.

So assume you have three input records i1, i2, i3 and get the outputs asynchronously in the order o2, o1, o3 after 100 ms each, then there is no output after receiving o2, then o1 and o2 are outputted after 200 ms, and then o3 after 300 ms.

Best,

Arvid

On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
Let's say, my elements are simple key-value pairs, elements are coming from Kafka, where they were partitioned by "key", then I do processing using KeyedProcessFunction (keyed by same "key"), then I enrich elements using ordered RichAsyncFunction, then output to another KeyedProcessFunction (keyed by same "key") and then write to Kafka topic, again partitioned by same "key", something like this:

FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) -> AsyncDataStream.orderedWait() -> keyBy("key")->Output(KeyedProcessFunction)->FlinkKafkaProducer

Will it preserve order of events with same "key"? 
  • Will Output function receive elements with same "key" in same order as they were originally in Kafka?
  • Will FlinkKafkaProducer writes elements with same "key" in same order as they were originally in Kafka?
  • Does it depend on parallelism of async IO? Documentation says "the stream order is preserved", but if there are multiple parallel instances of async function, does it mean order relative to each single instance? Or total stream order?
Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: async io parallelism

Arvid Heise-3
Hi Alexey,

no there are as many instances as configured, but each would operate on the current key group range and maintain the order on that.
So messages with the same key are never shuffled with ordered async. Messages with different key would be processed potentially independently and could change order.

Best,

Arvid

On Mon, Feb 24, 2020 at 4:43 PM Alexey Trenikhun <[hidden email]> wrote:
Arvid, thank you.
So there is single instance of FIFO per async IO operator regardless of parallelism of the async IO operator?
Thanks,
Alexey


From: Arvid Heise <[hidden email]>
Sent: Saturday, February 22, 2020 1:23:01 PM
To: Alexey Trenikhun <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: async io parallelism
 
Hi Alexey,

the short answer is: order is preserved in all cases.

Basically, ordered asyncIO maintains an internal FIFO queue where all pending elements reside. All async results are saved into this queue, but elements will only be outputted when the head element has a result.

So assume you have three input records i1, i2, i3 and get the outputs asynchronously in the order o2, o1, o3 after 100 ms each, then there is no output after receiving o2, then o1 and o2 are outputted after 200 ms, and then o3 after 300 ms.

Best,

Arvid

On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
Let's say, my elements are simple key-value pairs, elements are coming from Kafka, where they were partitioned by "key", then I do processing using KeyedProcessFunction (keyed by same "key"), then I enrich elements using ordered RichAsyncFunction, then output to another KeyedProcessFunction (keyed by same "key") and then write to Kafka topic, again partitioned by same "key", something like this:

FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) -> AsyncDataStream.orderedWait() -> keyBy("key")->Output(KeyedProcessFunction)->FlinkKafkaProducer

Will it preserve order of events with same "key"? 
  • Will Output function receive elements with same "key" in same order as they were originally in Kafka?
  • Will FlinkKafkaProducer writes elements with same "key" in same order as they were originally in Kafka?
  • Does it depend on parallelism of async IO? Documentation says "the stream order is preserved", but if there are multiple parallel instances of async function, does it mean order relative to each single instance? Or total stream order?
Thanks,
Alexey