Backoff strategies for async IO functions?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Backoff strategies for async IO functions?

William Saar-2
Hi,
Is there a way to specify an exponential backoff strategy for when async function calls fail?

I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call?

Thanks,
William


Reply | Threaded
Open this post in threaded view
|

Re: Backoff strategies for async IO functions?

Konstantin Knauf-2
Hi William,

the AsyncOperator does not have such a setting. It is "merely" a wrapper around an asynchronous call, which provides integration with Flink's state & time management.

I think, the way to go would be to do the exponential back-off in the user code and set the timeout of the AsyncOperator to the sum of the timeouts in the user code (e.g. 2s + 4s + 8s + 16s).

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote:
Hi,
Is there a way to specify an exponential backoff strategy for when async function calls fail?

I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call?

Thanks,
William




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Backoff strategies for async IO functions?

Shuyi Chen
Hi Konstantin,

(cc Till since he owns the code)

For async-IO, IO failure and retry is a common & expected pattern. In most of the use cases, users will need to deal with IO failure and retry. Therefore, I think it's better to address the problem in Flink rather than user implementing its custom logic in user code for a better dev experience. We do have similar problem in many of our use cases. To enable backoff and retry, we need to put the failed message to a DLQ (another Kafka topic) and re-ingest the message from the DLQ topic to retry, which is manual/cumbersome and require setting up extra Kafka topic. 

Can we add multiple strategies to handle async IO failure in the AsyncWaitOperator? I propose the following strategies:

  • FAIL_OPERATOR (default & current behavior)
  • FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
  • EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
What do you guys think? Thanks a lot.

Shuyi

On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <[hidden email]> wrote:
Hi William,

the AsyncOperator does not have such a setting. It is "merely" a wrapper around an asynchronous call, which provides integration with Flink's state & time management.

I think, the way to go would be to do the exponential back-off in the user code and set the timeout of the AsyncOperator to the sum of the timeouts in the user code (e.g. 2s + 4s + 8s + 16s).

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote:
Hi,
Is there a way to specify an exponential backoff strategy for when async function calls fail?

I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call?

Thanks,
William




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Backoff strategies for async IO functions?

Konstantin Knauf-2
Hi Shuyi,

I am not sure. You could handle retries in the user code within org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke without using a DLQ as described in my original answer to William.  On the other hand, I agree that it could easier for the user and it is indeed a common scenario. 

Two follow up questions come to mind:
  • When a Flink job fails and restarts, would you expect the "retry counter" to be reset or to continue where it left off?
  • With AsyncStream.orderedWait() when would you expect a record to be skipped? After the final timeout, after the first timeout?
Would you like to create a JIRA ticket [1] for this improvement with answers to the questions above and we can continue to discuss it there.

Best,

Konstantin



On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote:
Hi Konstantin,

(cc Till since he owns the code)

For async-IO, IO failure and retry is a common & expected pattern. In most of the use cases, users will need to deal with IO failure and retry. Therefore, I think it's better to address the problem in Flink rather than user implementing its custom logic in user code for a better dev experience. We do have similar problem in many of our use cases. To enable backoff and retry, we need to put the failed message to a DLQ (another Kafka topic) and re-ingest the message from the DLQ topic to retry, which is manual/cumbersome and require setting up extra Kafka topic. 

Can we add multiple strategies to handle async IO failure in the AsyncWaitOperator? I propose the following strategies:

  • FAIL_OPERATOR (default & current behavior)
  • FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
  • EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
What do you guys think? Thanks a lot.

Shuyi

On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <[hidden email]> wrote:
Hi William,

the AsyncOperator does not have such a setting. It is "merely" a wrapper around an asynchronous call, which provides integration with Flink's state & time management.

I think, the way to go would be to do the exponential back-off in the user code and set the timeout of the AsyncOperator to the sum of the timeouts in the user code (e.g. 2s + 4s + 8s + 16s).

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote:
Hi,
Is there a way to specify an exponential backoff strategy for when async function calls fail?

I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call?

Thanks,
William




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Backoff strategies for async IO functions?

Rong Rong
Thanks for raising the concern @shuyi and the explanation @konstantin. 

Upon glancing on the Flink document, it seems like user have full control on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not straightforward to access the internal state of the operator to, for example, put the message back to the async buffer with a retry tag. Thus, I also think that giving a set of common timeout handling seems to be a good idea for Flink users and this could be very useful feature. 

Regarding the questions and concerns
1. should the "retry counter" to be reset or to continue where it left off? 
- This is definitely a good point as this counter might need to go into the operator state if we decided to carry over the retry counter. Functionality-wise I think this should be reset because it doesn't represent the same transient state at the time of failure once restart.

2. When should AsyncStream.orderedWait() skip a record?
- This should be configurable by user I am assuming, for example we can have additional properties for each strategy described by @shuyi like a combination of:
  - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)

I've also created a JIRA ticket [2] for the discussion, please feel free to share your thoughts and comments.

--
Rong


On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <[hidden email]> wrote:
Hi Shuyi,

I am not sure. You could handle retries in the user code within org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke without using a DLQ as described in my original answer to William.  On the other hand, I agree that it could easier for the user and it is indeed a common scenario. 

Two follow up questions come to mind:
  • When a Flink job fails and restarts, would you expect the "retry counter" to be reset or to continue where it left off?
  • With AsyncStream.orderedWait() when would you expect a record to be skipped? After the final timeout, after the first timeout?
Would you like to create a JIRA ticket [1] for this improvement with answers to the questions above and we can continue to discuss it there.

Best,

Konstantin



On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote:
Hi Konstantin,

(cc Till since he owns the code)

For async-IO, IO failure and retry is a common & expected pattern. In most of the use cases, users will need to deal with IO failure and retry. Therefore, I think it's better to address the problem in Flink rather than user implementing its custom logic in user code for a better dev experience. We do have similar problem in many of our use cases. To enable backoff and retry, we need to put the failed message to a DLQ (another Kafka topic) and re-ingest the message from the DLQ topic to retry, which is manual/cumbersome and require setting up extra Kafka topic. 

Can we add multiple strategies to handle async IO failure in the AsyncWaitOperator? I propose the following strategies:

  • FAIL_OPERATOR (default & current behavior)
  • FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
  • EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
What do you guys think? Thanks a lot.

Shuyi

On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <[hidden email]> wrote:
Hi William,

the AsyncOperator does not have such a setting. It is "merely" a wrapper around an asynchronous call, which provides integration with Flink's state & time management.

I think, the way to go would be to do the exponential back-off in the user code and set the timeout of the AsyncOperator to the sum of the timeouts in the user code (e.g. 2s + 4s + 8s + 16s).

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote:
Hi,
Is there a way to specify an exponential backoff strategy for when async function calls fail?

I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call?

Thanks,
William




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Backoff strategies for async IO functions?

Till Rohrmann
Sorry for joining the discussion so late. I agree that we could add some more syntactic sugar for handling failure cases. Looking at the existing interfaces, I think it should be fairly easy to create an abstract class AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the retry logic for asynchronous operations. I think it is not strictly necessary to change the AsyncWaitOperator to add this functionality.

Cheers,
Till

On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <[hidden email]> wrote:
Thanks for raising the concern @shuyi and the explanation @konstantin. 

Upon glancing on the Flink document, it seems like user have full control on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not straightforward to access the internal state of the operator to, for example, put the message back to the async buffer with a retry tag. Thus, I also think that giving a set of common timeout handling seems to be a good idea for Flink users and this could be very useful feature. 

Regarding the questions and concerns
1. should the "retry counter" to be reset or to continue where it left off? 
- This is definitely a good point as this counter might need to go into the operator state if we decided to carry over the retry counter. Functionality-wise I think this should be reset because it doesn't represent the same transient state at the time of failure once restart.

2. When should AsyncStream.orderedWait() skip a record?
- This should be configurable by user I am assuming, for example we can have additional properties for each strategy described by @shuyi like a combination of:
  - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)

I've also created a JIRA ticket [2] for the discussion, please feel free to share your thoughts and comments.

--
Rong


On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <[hidden email]> wrote:
Hi Shuyi,

I am not sure. You could handle retries in the user code within org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke without using a DLQ as described in my original answer to William.  On the other hand, I agree that it could easier for the user and it is indeed a common scenario. 

Two follow up questions come to mind:
  • When a Flink job fails and restarts, would you expect the "retry counter" to be reset or to continue where it left off?
  • With AsyncStream.orderedWait() when would you expect a record to be skipped? After the final timeout, after the first timeout?
Would you like to create a JIRA ticket [1] for this improvement with answers to the questions above and we can continue to discuss it there.

Best,

Konstantin



On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote:
Hi Konstantin,

(cc Till since he owns the code)

For async-IO, IO failure and retry is a common & expected pattern. In most of the use cases, users will need to deal with IO failure and retry. Therefore, I think it's better to address the problem in Flink rather than user implementing its custom logic in user code for a better dev experience. We do have similar problem in many of our use cases. To enable backoff and retry, we need to put the failed message to a DLQ (another Kafka topic) and re-ingest the message from the DLQ topic to retry, which is manual/cumbersome and require setting up extra Kafka topic. 

Can we add multiple strategies to handle async IO failure in the AsyncWaitOperator? I propose the following strategies:

  • FAIL_OPERATOR (default & current behavior)
  • FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
  • EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
What do you guys think? Thanks a lot.

Shuyi

On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <[hidden email]> wrote:
Hi William,

the AsyncOperator does not have such a setting. It is "merely" a wrapper around an asynchronous call, which provides integration with Flink's state & time management.

I think, the way to go would be to do the exponential back-off in the user code and set the timeout of the AsyncOperator to the sum of the timeouts in the user code (e.g. 2s + 4s + 8s + 16s).

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote:
Hi,
Is there a way to specify an exponential backoff strategy for when async function calls fail?

I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call?

Thanks,
William




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Backoff strategies for async IO functions?

Rong Rong
Thanks for the feedback @Till. 

Yes I agree as well that opening up or changing the AsyncWaitOperator doesn't seem to be a necessity here. 
I think making "AsyncFunctionBase", making the current AsyncFunction as a extension of it with a some of the default behaviors like Shuyi suggested seems to be a good starting point.
To some extend we can also provide some of these strategies discussed as default building blocks but I am not sure this is a must once we have the "AsyncFunctionBase".

I would try to create a POC for the change and gather some feedbacks and see if the abstract class contains too much or too little flexibilities.

Best,
Rong 

On Tue, Mar 19, 2019 at 10:32 AM Till Rohrmann <[hidden email]> wrote:
Sorry for joining the discussion so late. I agree that we could add some more syntactic sugar for handling failure cases. Looking at the existing interfaces, I think it should be fairly easy to create an abstract class AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the retry logic for asynchronous operations. I think it is not strictly necessary to change the AsyncWaitOperator to add this functionality.

Cheers,
Till

On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <[hidden email]> wrote:
Thanks for raising the concern @shuyi and the explanation @konstantin. 

Upon glancing on the Flink document, it seems like user have full control on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not straightforward to access the internal state of the operator to, for example, put the message back to the async buffer with a retry tag. Thus, I also think that giving a set of common timeout handling seems to be a good idea for Flink users and this could be very useful feature. 

Regarding the questions and concerns
1. should the "retry counter" to be reset or to continue where it left off? 
- This is definitely a good point as this counter might need to go into the operator state if we decided to carry over the retry counter. Functionality-wise I think this should be reset because it doesn't represent the same transient state at the time of failure once restart.

2. When should AsyncStream.orderedWait() skip a record?
- This should be configurable by user I am assuming, for example we can have additional properties for each strategy described by @shuyi like a combination of:
  - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)

I've also created a JIRA ticket [2] for the discussion, please feel free to share your thoughts and comments.

--
Rong


On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <[hidden email]> wrote:
Hi Shuyi,

I am not sure. You could handle retries in the user code within org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke without using a DLQ as described in my original answer to William.  On the other hand, I agree that it could easier for the user and it is indeed a common scenario. 

Two follow up questions come to mind:
  • When a Flink job fails and restarts, would you expect the "retry counter" to be reset or to continue where it left off?
  • With AsyncStream.orderedWait() when would you expect a record to be skipped? After the final timeout, after the first timeout?
Would you like to create a JIRA ticket [1] for this improvement with answers to the questions above and we can continue to discuss it there.

Best,

Konstantin



On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote:
Hi Konstantin,

(cc Till since he owns the code)

For async-IO, IO failure and retry is a common & expected pattern. In most of the use cases, users will need to deal with IO failure and retry. Therefore, I think it's better to address the problem in Flink rather than user implementing its custom logic in user code for a better dev experience. We do have similar problem in many of our use cases. To enable backoff and retry, we need to put the failed message to a DLQ (another Kafka topic) and re-ingest the message from the DLQ topic to retry, which is manual/cumbersome and require setting up extra Kafka topic. 

Can we add multiple strategies to handle async IO failure in the AsyncWaitOperator? I propose the following strategies:

  • FAIL_OPERATOR (default & current behavior)
  • FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
  • EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
What do you guys think? Thanks a lot.

Shuyi

On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <[hidden email]> wrote:
Hi William,

the AsyncOperator does not have such a setting. It is "merely" a wrapper around an asynchronous call, which provides integration with Flink's state & time management.

I think, the way to go would be to do the exponential back-off in the user code and set the timeout of the AsyncOperator to the sum of the timeouts in the user code (e.g. 2s + 4s + 8s + 16s).

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote:
Hi,
Is there a way to specify an exponential backoff strategy for when async function calls fail?

I have an async function that does web requests to a rate-limited API. Can you handle that with settings on the async function call?

Thanks,
William




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen