How does Flink recovers uncommited Kafka offset in AsyncIO?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How does Flink recovers uncommited Kafka offset in AsyncIO?

xwang355
Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with Flink and calls out to an external system as part of the event processing. One of the most important requirements are read from Kafka should NEVER stall, even in face of some async external calls slowness while holding certain some kafka offsets. At least once processing is good enough. 

Currently, I am using AsyncIO with a thread pool of size 20. My understanding is if I use orderedwait with a large 'capacity', consumption from Kafka should continue even if some external calls experience slowness (holding the offsets) as long as the capacity is not exhausted. 

(From my own reading of Flink source code, the capacity of the orderedwait function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source should keep pumping out from Kafka as long as there is still capacity, but offset after the stuck record should NOT be committed back to Kafka and (the checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for instance), the consumption was not stalled (which is good), but the offsets were all committed back to Kafka AFTER the stalled records and all checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled offsets? Which checkpoint does Flink rollback to?  I understand that commiting offset back to Kafka is merely to show progress to external monitoring tool, but I hope Flink does book keeping somewhere to journal async call xyz is not return and should be retried during recovery.

======

I`ve done a some more experiments, looks like Flink is able to recover the record which I threw completeExceptionly even if I use 'unorderedwait' on the async stream. 

Which leads to Fabian`s early comments, 'Flink does not rely on Kafka consumer offset to recover, committing offset to Kafka is merely to show progress to external monitoring tools'. 

I couldn`t pinpoint the code that Flink uses the achieve it, maybe in-flight async invokations in 'unorderedstreamelementqueue' are part of the checkpoint and Flink saves the actual payload for later replay?

Can anyone cast some lights?    
Reply | Threaded
Open this post in threaded view
|

Re: How does Flink recovers uncommited Kafka offset in AsyncIO?

Fabian Hueske-2
Hi,

Kafka offsets are only managed by the Flink Kafka Consumer. All following operators do not care whether the events were read from Kafka, files, Kinesis or whichever source.
It is the responsibility of the source to include its reading position (in case of Kafka the partition offsets) in a checkpoint.
The AsyncIO operator will checkpoint all events for which requests are currently in flight, i.e., it checkpoints it's working set that needs to be recovered (with requests being sent out again) after a failure.
Flink's checkpointing mechanism which is based on checkpoint barriers ensures that all operator checkpoints (for example Kafka offsets and AsyncIO requests) are consistent, i.e., every record that was read before the checkpoint and which was stuck in the AsyncIO operator is in the AsyncIO state. Every record that was read after the checkpoint (and before the failure) is read again and not in the AsyncIO state.

Btw. if you don't require ordered output of the AsyncIO operator, I'd switch to unordered wait. Otherwise, a single blocking call might block subsequent events to be emitted because their are not allowed to overtake the blocking event.

Best, Fabian
 

Am Mo., 1. Juli 2019 um 21:41 Uhr schrieb wang xuchen <[hidden email]>:
Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with Flink and calls out to an external system as part of the event processing. One of the most important requirements are read from Kafka should NEVER stall, even in face of some async external calls slowness while holding certain some kafka offsets. At least once processing is good enough. 

Currently, I am using AsyncIO with a thread pool of size 20. My understanding is if I use orderedwait with a large 'capacity', consumption from Kafka should continue even if some external calls experience slowness (holding the offsets) as long as the capacity is not exhausted. 

(From my own reading of Flink source code, the capacity of the orderedwait function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source should keep pumping out from Kafka as long as there is still capacity, but offset after the stuck record should NOT be committed back to Kafka and (the checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for instance), the consumption was not stalled (which is good), but the offsets were all committed back to Kafka AFTER the stalled records and all checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled offsets? Which checkpoint does Flink rollback to?  I understand that commiting offset back to Kafka is merely to show progress to external monitoring tool, but I hope Flink does book keeping somewhere to journal async call xyz is not return and should be retried during recovery.

======

I`ve done a some more experiments, looks like Flink is able to recover the record which I threw completeExceptionly even if I use 'unorderedwait' on the async stream. 

Which leads to Fabian`s early comments, 'Flink does not rely on Kafka consumer offset to recover, committing offset to Kafka is merely to show progress to external monitoring tools'. 

I couldn`t pinpoint the code that Flink uses the achieve it, maybe in-flight async invokations in 'unorderedstreamelementqueue' are part of the checkpoint and Flink saves the actual payload for later replay?

Can anyone cast some lights?