Flink Kafka ordered offset commit & unordered processing

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka ordered offset commit & unordered processing

xwang355
Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with Flink and calls out to an external system as part of the event processing. One of the most important requirements are read from Kafka should NEVER stall, even in face of some async external calls slowness while holding certain some kafka offsets. At least once processing is good enough. 

Currently, I am using AsyncIO with a thread pool of size 20. My understanding is if I use orderedwait with a large 'capacity', consumption from Kafka should continue even if some external calls experience slowness (holding the offsets) as long as the capacity is not exhausted. 

(From my own reading of Flink source code, the capacity of the orderedwait function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source should keep pumping out from Kafka as long as there is still capacity, but offset after the stuck record should NOT be committed back to Kafka and (the checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for instance), the consumption was not stalled (which is good), but the offsets were all committed back to Kafka AFTER the stalled records and all checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled offsets? Which checkpoint does Flink rollback to?  I understand that commiting offset back to Kafka is merely to show progress to external monitoring tool, but I hope Flink does book keeping somewhere to journal async call xyz is not return and should be retried during recovery.

Thanks a lot
Ben




Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka ordered offset commit & unordered processing

Piotr Nowojski-3
Hi,

If your async operations are stalled, this will eventually cause problems. Either this will back pressure sources (the async’s operator queue will become full) or you will run out of memory (if you configured the queue’s capacity too high). I think the only possible solution is to either drop records in some way, or to spill them to some storage for later processing (assuming that the storage will not overflow/will not cause stalls on it’s own).

Regarding the Kafka offsets, as you wrote, Flink’s KafkaConsumer is not using internal Kafka offsets for recovery - for this purpose Kafka offsets are stored inside Flink’s state.

Regarding the checkpointing you can read about how it’s being done in general in the docs [1]. Once barrier alignment for the async operator is done, it checkpoints its state. Part of this state are the queues of elements that are currently being processed asynchronously. So if failure happens, after recovery all of the operators (sources, async operator, sinks, …) are restored effectively to the same logical point of time. In case of async operator, async operations that were caught in the middle of processing when checkpoint barriers arrived are resubmitted/retried.

I hope that answers yours questions :)

Piotrek 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html

On 30 Jun 2019, at 04:47, wang xuchen <[hidden email]> wrote:

Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with Flink and calls out to an external system as part of the event processing. One of the most important requirements are read from Kafka should NEVER stall, even in face of some async external calls slowness while holding certain some kafka offsets. At least once processing is good enough. 

Currently, I am using AsyncIO with a thread pool of size 20. My understanding is if I use orderedwait with a large 'capacity', consumption from Kafka should continue even if some external calls experience slowness (holding the offsets) as long as the capacity is not exhausted. 

(From my own reading of Flink source code, the capacity of the orderedwait function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source should keep pumping out from Kafka as long as there is still capacity, but offset after the stuck record should NOT be committed back to Kafka and (the checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for instance), the consumption was not stalled (which is good), but the offsets were all committed back to Kafka AFTER the stalled records and all checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled offsets? Which checkpoint does Flink rollback to?  I understand that commiting offset back to Kafka is merely to show progress to external monitoring tool, but I hope Flink does book keeping somewhere to journal async call xyz is not return and should be retried during recovery.

Thanks a lot
Ben





Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka ordered offset commit & unordered processing

Piotr Nowojski-3
In reply to this post by xwang355
Hi,

> Will Flink  able to recover under this scenario? 

I’m not sure exactly what you mean.

Flink will be able to restore the state to the last successful checkpoint, and it well could be that the some records after this initial “stuck record”  were processed and emitted down the stream. In that case, Flink’s AsyncWaitOperator will re-submit all of the requests that were not completed successfully.

However keep in mind that currently there are some issues with AsyncWaitOperator if it’s chained after an operator that registers some processing timers or after a flatMap. For now, for safety, it’s best to manually make sure that AsyncWaitOperator is at the head of an operator chain.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-13063

On 3 Jul 2019, at 15:49, [hidden email] wrote:

Hi Piotrek,

Thank you for replying. My reply to your post never show up for some reason. I am trying to see if reply to author can get my message out.


"If your async operations are stalled, this will eventually cause problems. Either this will back pressure sources (the async’s operator queue will become full) or you will run out of memory (if you configured the queue’s capacity too high)."

I agree, if the stall is caused by external processing system slowness in general, no doubt about it. However, I just want to make sure that occasionally slowness can be absorbed. Even if the record fail to be processed eventually, Flink has a way to re-play just that record. Important thing is one slow async call does not suspend the whole stream consumption.

"I think the only possible solution is to either drop records in some way, or to spill them to some storage for later processing"

If indeed there was a catastrophic failure of the external processing system, assuming the records are still within Kafka`s retention period, the only possible solution is to manually rollback the Kafka offset for reprocessing, which is OK for my use case.

Image the use case is for bank money transfer, the external system is money laundry check. At one point, external system became slow, one transfer request was stuck, but following transfer requests are still processed, at one point, the external system was tipped over, some transfer requests after the initial stuck request were processed, some were not. Flink consumer thread stopped pulling from Kafka due to backpressure. Then the entire Flink cluster experienced power outage. Will Flink  able to recover under this scenario?



<quote author='Piotr Nowojski-3'>
Hi,

If your async operations are stalled, this will eventually cause problems.
Either this will back pressure sources (the async’s operator queue will
become full) or you will run out of memory (if you configured the queue’s
capacity too high). I think the only possible solution is to either drop
records in some way, or to spill them to some storage for later processing
(assuming that the storage will not overflow/will not cause stalls on it’s
own).

Regarding the Kafka offsets, as you wrote, Flink’s KafkaConsumer is not
using internal Kafka offsets for recovery - for this purpose Kafka offsets
are stored inside Flink’s state.

Regarding the checkpointing you can read about how it’s being done in
general in the docs [1]. Once barrier alignment for the async operator is
done, it checkpoints its state. Part of this state are the queues of
elements that are currently being processed asynchronously. So if failure
happens, after recovery all of the operators (sources, async operator,
sinks, …) are restored effectively to the same logical point of time. In
case of async operator, async operations that were caught in the middle of
processing when checkpoint barriers arrived are resubmitted/retried.

I hope that answers yours questions :)

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html
<https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html>

On 30 Jun 2019, at 04:47, wang xuchen <[hidden email]> wrote:

Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with
Flink and calls out to an external system as part of the event processing.
One of the most important requirements are read from Kafka should NEVER
stall, even in face of some async external calls slowness while holding
certain some kafka offsets. At least once processing is good enough.

Currently, I am using AsyncIO with a thread pool of size 20. My
understanding is if I use orderedwait with a large 'capacity', consumption
from Kafka should continue even if some external calls experience slowness
(holding the offsets) as long as the capacity is not exhausted.

(From my own reading of Flink source code, the capacity of the orderedwait
function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source
should keep pumping out from Kafka as long as there is still capacity, but
offset after the stuck record should NOT be committed back to Kafka and
(the checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for
instance), the consumption was not stalled (which is good), but the
offsets were all committed back to Kafka AFTER the stalled records and all
checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled
offsets? Which checkpoint does Flink rollback to?  I understand that
commiting offset back to Kafka is merely to show progress to external
monitoring tool, but I hope Flink does book keeping somewhere to journal
async call xyz is not return and should be retried during recovery.

Thanks a lot
Ben






</quote>
Quoted from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-ordered-offset-commit-unordered-processing-tp28483p28515.html


_____________________________________
Sent from http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com