Scenario
======= A partition that Flink is reading: [ 1 - 2 - 3 - 4 - 5 - 6 - 7 - | 8 _ 9 _ 10 _ 11 | 12 ~ 13 ] [. Committed. | In flight | unread ] Kafka basically breaks off pieces of the end of the queue and shoves them downstream for processing? So suppose while semantically: - 8 &10 succeed (api call success) - 9 & 11 fail (api failure). Failure Handling options ================== Basically we have two options to handle failures? A. Try/catch to deadletter queue ``` try { api.write(8, 9, 10, 11); } catch E { // 9, 11 failed to write to the api so we deadletter them deadletterQueue.write(E.failed_set()) } ``` B. Or it can fail - which will retry the batch? ``` api.write(8, 9, 10, 11); // 9, 11 failed to write to the api ``` In situation (B.), we're rewriting 8 and 10 to the api, which is bad, so situation (A.) seems better. Challenge I can't understand ====================== However in (A.) we then do something with the queue: A2. Try/catch to another deadletter queue? ``` try { api.write(9, 11); } catch E { //11 failed to write to the api deadletterQueue2.write(E.failed_set()) } ``` Do you see what I mean? Is it turtles all the way down? Should I create a separate index of semantic outcome? Where should it live? Should I just keep things in the queue until smime.p7s (5K) Download Attachment |
Hi Jack, I do not fully understand what you want to achieve here. Could you please give me a bit more context? Are you asking how Flink realizes exactly once processing guarantees with its connectors? Cheers, Till On Fri, Jul 31, 2020 at 8:56 PM Jack Phelan <[hidden email]> wrote:
|
Hi Till,
Till: Could you please give me a bit more context? Are you asking how Flink realizes exactly once processing guarantees with its connectors? Thank you very much for your response! Flink has a lot of really cool ideas :) I did read more about connectors and I think I can elaborate. The problem I have is about backpressure and failures - all coming from sinks/external apis. (Don't worry we're doing retries and backoff but still there's more to try :) What I'd like to achieve is:
Kafka with topics per API: Topic 1: AWS Topic 2: Azure Topic 3: GCP Topic 4: IBM ... (more topcs) read Kafka -> (key by API in [AWS, Azure, GCP, IBM, Oracle, etc]) -> batch and write to the API in the key. I'm reading in api scoped topics from kafka, keying by api and then attempting to write to the api in the key. I know from time to time that some of these APIs will have limited capacity. For example, we're writing to AWS in a shared account that has API limits that other users in the account can exhaust, limiting our capacity to write for unpredictable periods. I'd really like to get your feedback on the approaches I'm considering and any suggestions you might have. Handling failed writesApproach 1. BlockIf I just block in the API write, then backpressure will build up and get to the kafka reader which will then not read in new records, leaving them in kafka? Is this a reasonable approach? What about the filled buffers inside the TaskManagers? Should I worry about them? Will backpressure from one topic and one api delay the others? Kafka will let writers keep appending, and we would catch up when the API is not degraded. Approach 2. DeadletterI'm considering writing the event to a deadletter queue, but then whoever processes the deadletter queue has the same issue, since the API can be degraded at any time. We don't want to manually start deadletter processing, since degraded APIs happen very regularly.Approach 3. Re-enqueueI'm considering writing the task back to the same input queue topic with metadata that it's been attempted. Is it possible to delay emitting records from one topic because it has a degraded sink, but continue to process others? I don't think I can prioritize events within one topic - the AWS topic for instance. I could create topics with prioritization, AWS_P0, AWS_P1, etc. ThrottlingI'm thinking just keep some state for the API by the API key, about the number of requests and the proportion that are accepted. (Something like https://landing.google.com/sre/sre-book/chapters/handling-overload/#client-side-throttling-a7sYUg) If I decide that I should throttle this event, then I need to apply whatever backpressure mechanism I decided on. Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi Jack, if your records are already partitioned wrt the individual topics and you don't need to compute some global values, then you could create for every topic a separate Flink pipeline (separate FlinkKafkaConsumer) which runs independently. That way if one of the APIs degrades it will automatically backpressure the Kafka consumer for this specific topic w/o affecting the other topics. If you do indeed need to read all different topics from the same connected component of your job graph (e.g. you want to calculate some global value, the input is not properly partitioned which entails that you need a keyBy operation), then I believe that it can happen that one slow sink (API in your case) can potentially backpressure the whole topology and thereby all other APIs. The next best approach I believe would be approach 3. That way you would unblock the pipeline in order to be able to make progress. However, the problem is that you might end up in something similar to a busy loop if your API A is currently degraded and A is the only record left in your topic to process. If you can tolerate to lose some records, then approach 2. could also work. Here one could try to keep a subset of the failed requests in memory until it grows too big or until they could be successfully retried. But this strongly depends on your use case. Cheers, Till Cheers, Till On Wed, Aug 5, 2020 at 12:44 AM jackphelanbroadcom <[hidden email]> wrote: Hi Till,Till: Could you please give me a bit more context? Are you asking how Flink realizes exactly once processing guarantees with its connectors? |
Free forum by Nabble | Edit this page |