Flink requesting external web service with rate limited requests

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink requesting external web service with rate limited requests

Giuliano Caliari
This post was updated on .
Hello,

I have an interesting problem that I'm having a hard time modeling on Flink, I'm not sure if it's the right tool for the job.

I have a stream of messages in Kafka that I need to group and send them to an external web service but I have some concerns that need to be addressed:

1. Rate Limited requests => Only tens of requests per minute. If the limit is exceeded the system has to stop making requests for a few minutes.
2. Crash handling => I'm using savepoints

My first (naive) solution was to implement on a Sink function but the requests may take a long time to return (up to minutes) so blocking the thread will interfere with the savepoint mechanism (see here). Because of this implementing the limit on the sink and relying on backpressure to slow down the flow will get in the way of savepointing. I'm not sure how big of a problem this will be but on my tests I'm reading thousands of messages before the backpressure mechanism starts and savepointing is taking around 10 minutes before crashing with "Checkpoint expired before completing" Exception.

My second implementation was sleeping on the Fetcher for the Kafka Consumer but the ws requests time have a huge variance so I ended up implementing a communication channel between the sink and the source - an object with mutable state. Not great.  

So my question is if there is a nice way to limit the flow of messages on the system according to the rate given by a sink function? Is there any other way I could make this work on Flink?

Thank you
Reply | Threaded
Open this post in threaded view
|

Re: Flink requesting external web service with rate limited requests

Fabian Hueske-2
Hi Giuliano,

Flink 1.2 introduced the AsyncFunction which asynchronously sends requests to external systems (k-v-stores, web services, etc.).
You can limit the number of concurrent requests, but AFAIK you cannot specify a limit of requests per minute.
Maybe you can configure the function such that it works for your use case.

Alternatively, you can take it as a blueprint for a custom operator because handles watermarks and checkpoints correctly.

I am not aware of a built-in mechanism to throttle a stream. You can do it manually and simply sleep() in a MapFunction but that will also block checkpoints.

Best, Fabian

2017-02-28 3:19 GMT+01:00 Giuliano Caliari <[hidden email]>:
Hello,

I have an interesting problem that I'm having a hard time modeling on Flink,
I'm not sure if it's the right tool for the job.

I have a stream of messages in Kafka that I need to group and send them to
an external web service but I have some concerns that need to be addressed:

1. Rate Limited requests => Only tens of requests per minute. If the limit
is exceeded the system has to stop making requests for a few minutes.
2. Crash handling => I'm using savepoints

My first (naive) solution was to implement on a Sink function but the
requests may take a long time to return (up to minutes) so blocking the
thread will interfere with the savepoint mechanism (see  here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rate-limit-processing-td11174.html>
). Because of this implementing the limit on the sink and relying on
backpressure to slow down the flow will get in the way of savepointing. I'm
not sure how big of a problem this will be but on my tests I'm reading
thousands of messages before the backpressure mechanism starts and
savepointing is taking around 20 minutes.

My second implementation was sleeping on the Fetcher for the Kafka Consumer
but the ws requests time have a huge variance so I ended up implementing a
communication channel between the sink and the source - an object with
mutable state. Not great.

So my question is if there is a nice way to limit the flow of messages on
the system according to the rate given by a sink function? Is there any
other way I could make this work on Flink?

Thank you



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-service-with-rate-limited-requests-tp11952.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink requesting external web service with rate limited requests

Yassine MARZOUGUI
Hi Fabian,

I have a related question regarding throttling at the source: If there is a sleep in the source as in ContinuousFileMonitoringFunction.java :

while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
Thread.sleep(interval);
}

Does it also block checkpoints?
Thanks.

Best,
Yassine

2017-02-28 10:39 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi Giuliano,

Flink 1.2 introduced the AsyncFunction which asynchronously sends requests to external systems (k-v-stores, web services, etc.).
You can limit the number of concurrent requests, but AFAIK you cannot specify a limit of requests per minute.
Maybe you can configure the function such that it works for your use case.

Alternatively, you can take it as a blueprint for a custom operator because handles watermarks and checkpoints correctly.

I am not aware of a built-in mechanism to throttle a stream. You can do it manually and simply sleep() in a MapFunction but that will also block checkpoints.

Best, Fabian

2017-02-28 3:19 GMT+01:00 Giuliano Caliari <[hidden email]>:
Hello,

I have an interesting problem that I'm having a hard time modeling on Flink,
I'm not sure if it's the right tool for the job.

I have a stream of messages in Kafka that I need to group and send them to
an external web service but I have some concerns that need to be addressed:

1. Rate Limited requests => Only tens of requests per minute. If the limit
is exceeded the system has to stop making requests for a few minutes.
2. Crash handling => I'm using savepoints

My first (naive) solution was to implement on a Sink function but the
requests may take a long time to return (up to minutes) so blocking the
thread will interfere with the savepoint mechanism (see  here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rate-limit-processing-td11174.html>
). Because of this implementing the limit on the sink and relying on
backpressure to slow down the flow will get in the way of savepointing. I'm
not sure how big of a problem this will be but on my tests I'm reading
thousands of messages before the backpressure mechanism starts and
savepointing is taking around 20 minutes.

My second implementation was sleeping on the Fetcher for the Kafka Consumer
but the ws requests time have a huge variance so I ended up implementing a
communication channel between the sink and the source - an object with
mutable state. Not great.

So my question is if there is a nice way to limit the flow of messages on
the system according to the rate given by a sink function? Is there any
other way I could make this work on Flink?

Thank you



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-service-with-rate-limited-requests-tp11952.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Flink requesting external web service with rate limited requests

Fabian Hueske-2
A SourceFunction may only emit records when it holds the checkpointLock (just as `ContinuousFileMonitoringFunction` does).
Flink only emits a checkpoint if it holds the lock. This ensures correct behavior.

Best, Fabian


2017-02-28 10:58 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
Hi Fabian,

I have a related question regarding throttling at the source: If there is a sleep in the source as in ContinuousFileMonitoringFunction.java :

while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
Thread.sleep(interval);
}

Does it also block checkpoints?
Thanks.

Best,
Yassine

2017-02-28 10:39 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi Giuliano,

Flink 1.2 introduced the AsyncFunction which asynchronously sends requests to external systems (k-v-stores, web services, etc.).
You can limit the number of concurrent requests, but AFAIK you cannot specify a limit of requests per minute.
Maybe you can configure the function such that it works for your use case.

Alternatively, you can take it as a blueprint for a custom operator because handles watermarks and checkpoints correctly.

I am not aware of a built-in mechanism to throttle a stream. You can do it manually and simply sleep() in a MapFunction but that will also block checkpoints.

Best, Fabian

2017-02-28 3:19 GMT+01:00 Giuliano Caliari <[hidden email]>:
Hello,

I have an interesting problem that I'm having a hard time modeling on Flink,
I'm not sure if it's the right tool for the job.

I have a stream of messages in Kafka that I need to group and send them to
an external web service but I have some concerns that need to be addressed:

1. Rate Limited requests => Only tens of requests per minute. If the limit
is exceeded the system has to stop making requests for a few minutes.
2. Crash handling => I'm using savepoints

My first (naive) solution was to implement on a Sink function but the
requests may take a long time to return (up to minutes) so blocking the
thread will interfere with the savepoint mechanism (see  here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rate-limit-processing-td11174.html>
). Because of this implementing the limit on the sink and relying on
backpressure to slow down the flow will get in the way of savepointing. I'm
not sure how big of a problem this will be but on my tests I'm reading
thousands of messages before the backpressure mechanism starts and
savepointing is taking around 20 minutes.

My second implementation was sleeping on the Fetcher for the Kafka Consumer
but the ws requests time have a huge variance so I ended up implementing a
communication channel between the sink and the source - an object with
mutable state. Not great.

So my question is if there is a nice way to limit the flow of messages on
the system according to the rate given by a sink function? Is there any
other way I could make this work on Flink?

Thank you



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-service-with-rate-limited-requests-tp11952.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Flink requesting external web service with rate limited requests

Giuliano Caliari
This post was updated on .
In reply to this post by Fabian Hueske-2
Hey Fabian,

One of my solutions implements the AsyncFunction but I'm still unable to savepoint because Flink reads the backed up records, thousands of historical records, right off the bat and when I issue a savepoint request it has to wait for all those records to be processed which takes a couple of hours. So I'm still getting the error when savepointing.
Alternatively I could wait for the backed up records to be processed and issue savepoints afterwards but there is a risk of failures and I would have to restart the whole process.

Another idea would be if we could restart the process at a specific Kafka offset which we could save on positive responses from the external web service. We would need to bundle the Kafka offset on the records. There would be some duplication in case of errors but that's acceptable. Is there any easy way we can do this?

Cheers,
Reply | Threaded
Open this post in threaded view
|

Re: Flink requesting external web service with rate limited requests

Fabian Hueske-2
Hi,

I assume the problem with the slow savepoints is because the checkpoint barriers which ensure the consistency of the savepoint get stuck between the records which are buffered due to backpressure. At some point the savepoint might get cancelled because it does not seem to make progress.
You can reduce the amount of data which is buffered due to backpressure by reducing the number of network buffers (taskmanager.network.numberOfBuffers) [1].
This will help the barriers to reach the operators faster.

I don't think there is a ready-to-go way to integrate Kafka offsets with a webserver response.
You can of course always implement your own source function but that's a bit of work.

Best, Fabian

2017-03-01 0:58 GMT+01:00 Giuliano Caliari <[hidden email]>:
Hey Fabian,

One of my solutions implements the AsyncFunction but I'm still unable to
savepoint because Flink reads the backed up records, thousands of historical
records, right off the bat and when I issue a savepoint request it has to
wait for all those records to be processed which takes a couple of hours. So
I'm still getting the error when savepointing.
Alternatively I could wait for the backed up records to be processed and
issue savepoints afterwards but there is a risk of failures and I would have
to restart the whole process.

Another idea would be if we could commit the Kafka offset only after we get
a positive response from the external web service. There would be some
duplication in case of errors but that's acceptable. Is there any easy way
we can do this?

Cheers,



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-service-with-rate-limited-requests-tp11952p11977.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.