Hi, Is there a way to specify an exponential backoff strategy for when async function calls fail? I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call? Thanks, William |
Hi William, I think, the way to go would be to do the exponential back-off in the user code and set the timeout of the AsyncOperator to the sum of the timeouts in the user code (e.g. 2s + 4s + 8s + 16s). Cheers, Konstantin On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote:
-- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Hi Konstantin, (cc Till since he owns the code) For async-IO, IO failure and retry is a common & expected pattern. In most of the use cases, users will need to deal with IO failure and retry. Therefore, I think it's better to address the problem in Flink rather than user implementing its custom logic in user code for a better dev experience. We do have similar problem in many of our use cases. To enable backoff and retry, we need to put the failed message to a DLQ (another Kafka topic) and re-ingest the message from the DLQ topic to retry, which is manual/cumbersome and require setting up extra Kafka topic. Can we add multiple strategies to handle async IO failure in the AsyncWaitOperator? I propose the following strategies:
What do you guys think? Thanks a lot. Shuyi On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <[hidden email]> wrote:
|
Hi Shuyi, I am not sure. You could handle retries in the user code within org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke without using a DLQ as described in my original answer to William. On the other hand, I agree that it could easier for the user and it is indeed a common scenario. Two follow up questions come to mind:
Would you like to create a JIRA ticket [1] for this improvement with answers to the questions above and we can continue to discuss it there. Best, Konstantin On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote:
-- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Thanks for raising the concern @shuyi and the explanation @konstantin. Upon glancing on the Flink document, it seems like user have full control on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not straightforward to access the internal state of the operator to, for example, put the message back to the async buffer with a retry tag. Thus, I also think that giving a set of common timeout handling seems to be a good idea for Flink users and this could be very useful feature. Regarding the questions and concerns 1. should the "retry counter" to be reset or to continue where it left off? - This is definitely a good point as this counter might need to go into the operator state if we decided to carry over the retry counter. Functionality-wise I think this should be reset because it doesn't represent the same transient state at the time of failure once restart. 2. When should AsyncStream.orderedWait() skip a record? - This should be configurable by user I am assuming, for example we can have additional properties for each strategy described by @shuyi like a combination of: - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY) I've also created a JIRA ticket [2] for the discussion, please feel free to share your thoughts and comments. -- Rong On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <[hidden email]> wrote:
|
Sorry for joining the discussion so late. I agree that we could add some more syntactic sugar for handling failure cases. Looking at the existing interfaces, I think it should be fairly easy to create an abstract class AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the retry logic for asynchronous operations. I think it is not strictly necessary to change the AsyncWaitOperator to add this functionality. Cheers, Till On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <[hidden email]> wrote:
|
Thanks for the feedback @Till. Yes I agree as well that opening up or changing the AsyncWaitOperator doesn't seem to be a necessity here. I think making "AsyncFunctionBase", making the current AsyncFunction as a extension of it with a some of the default behaviors like Shuyi suggested seems to be a good starting point. To some extend we can also provide some of these strategies discussed as default building blocks but I am not sure this is a must once we have the "AsyncFunctionBase". I would try to create a POC for the change and gather some feedbacks and see if the abstract class contains too much or too little flexibilities. Best, Rong On Tue, Mar 19, 2019 at 10:32 AM Till Rohrmann <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |