AsyncFunction retries

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

AsyncFunction retries

Gadi Katsovich
Hi all,
I have a job with the following diagram:
source -> Flat Map -> Filter -> Filter -> Filter -> async wait operator -> Process -> sink

The async operation sends an HTTP post (using Apache HttpAsyncClient).
In case the HTTP post times out or fails, I want to retry a few times.

Is using FutureUtils.retryWithDelay() acceptable in user code?
I tried it with local tests and the application works as expected. However, the API requires a ScheduledExecutor. And even though I provide one I see that the code is executed on the ForkJoin common pool.

I'm a bit confused as to how the threads work here, and I'm afraid to take up resources needed for Flink framework operation.

Please advise.
Reply | Threaded
Open this post in threaded view
|

Re: AsyncFunction retries

Arvid Heise-3
Hi Gadi,

FutureUtils is not a public API, so there are no single guarantees that if the method works now, it would work in any coming Flink version.

Rather, I'd first check if you can use httpcomponents client 5.0+, then you could simply use the retry handler [1].
If not, then I'd probably copy the code of retryWithDelay and and adjust the code to use the executor of httpcomponents (whenComplete instead of whenCompleteAsync).

In general, as long as you are not synchronously calling in AsyncFunction, you can choose any thread you want. I'm not aware that FutureUtils.retryWithDelay actually uses the common pool (after all you need to supply an executor).


On Thu, Jul 9, 2020 at 4:43 PM Gadi Katsovich <[hidden email]> wrote:
Hi all,
I have a job with the following diagram:
source -> Flat Map -> Filter -> Filter -> Filter -> async wait operator -> Process -> sink

The async operation sends an HTTP post (using Apache HttpAsyncClient).
In case the HTTP post times out or fails, I want to retry a few times.

Is using FutureUtils.retryWithDelay() acceptable in user code?
I tried it with local tests and the application works as expected. However, the API requires a ScheduledExecutor. And even though I provide one I see that the code is executed on the ForkJoin common pool.

I'm a bit confused as to how the threads work here, and I'm afraid to take up resources needed for Flink framework operation.

Please advise.


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng