This post was updated on .
Hello,
I have an interesting problem that I'm having a hard time modeling on Flink, I'm not sure if it's the right tool for the job. I have a stream of messages in Kafka that I need to group and send them to an external web service but I have some concerns that need to be addressed: 1. Rate Limited requests => Only tens of requests per minute. If the limit is exceeded the system has to stop making requests for a few minutes. 2. Crash handling => I'm using savepoints My first (naive) solution was to implement on a Sink function but the requests may take a long time to return (up to minutes) so blocking the thread will interfere with the savepoint mechanism (see here). Because of this implementing the limit on the sink and relying on backpressure to slow down the flow will get in the way of savepointing. I'm not sure how big of a problem this will be but on my tests I'm reading thousands of messages before the backpressure mechanism starts and savepointing is taking around 10 minutes before crashing with "Checkpoint expired before completing" Exception. My second implementation was sleeping on the Fetcher for the Kafka Consumer but the ws requests time have a huge variance so I ended up implementing a communication channel between the sink and the source - an object with mutable state. Not great. So my question is if there is a nice way to limit the flow of messages on the system according to the rate given by a sink function? Is there any other way I could make this work on Flink? Thank you |
Hi Giuliano, Flink 1.2 introduced the AsyncFunction which asynchronously sends requests to external systems (k-v-stores, web services, etc.).2017-02-28 3:19 GMT+01:00 Giuliano Caliari <[hidden email]>: Hello, |
Hi Fabian, I have a related question regarding throttling at the source: If there is a sleep in the source as in ContinuousFileMonitoringFunction.java : while (isRunning) { synchronized (checkpointLock) { monitorDirAndForwardSplits(fileSystem, context); } Thread.sleep(interval); } Does it also block checkpoints? Thanks. Best, Yassine 2017-02-28 10:39 GMT+01:00 Fabian Hueske <[hidden email]>:
|
A SourceFunction may only emit records when it holds the checkpointLock (just as `ContinuousFileMonitoringFunction` does). Flink only emits a checkpoint if it holds the lock. This ensures correct behavior. Best, Fabian 2017-02-28 10:58 GMT+01:00 Yassine MARZOUGUI <[hidden email]>:
|
This post was updated on .
In reply to this post by Fabian Hueske-2
Hey Fabian,
One of my solutions implements the AsyncFunction but I'm still unable to savepoint because Flink reads the backed up records, thousands of historical records, right off the bat and when I issue a savepoint request it has to wait for all those records to be processed which takes a couple of hours. So I'm still getting the error when savepointing. Alternatively I could wait for the backed up records to be processed and issue savepoints afterwards but there is a risk of failures and I would have to restart the whole process. Another idea would be if we could restart the process at a specific Kafka offset which we could save on positive responses from the external web service. We would need to bundle the Kafka offset on the records. There would be some duplication in case of errors but that's acceptable. Is there any easy way we can do this? Cheers, |
Hi, I assume the problem with the slow savepoints is because the checkpoint barriers which ensure the consistency of the savepoint get stuck between the records which are buffered due to backpressure. At some point the savepoint might get cancelled because it does not seem to make progress. You can reduce the amount of data which is buffered due to backpressure by reducing the number of network buffers (taskmanager.network.numberOfBuffers ) [1].This will help the barriers to reach the operators faster. 2017-03-01 0:58 GMT+01:00 Giuliano Caliari <[hidden email]>: Hey Fabian, |
Free forum by Nabble | Edit this page |