Hi all,
let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> sink" job and the reducer is sending backpressure signals to the preAggregate, map and source operator. How do I get those signals inside my operator's implementation? I guess inside the function is not possible. But if I have my own operator implemented (preAggregate) can I get those backpressure signals? I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate operator in order to decide when I stop the pre-aggregation and flush tuples or when I keep pre aggregating. It is something like the "credit based control on the network stack" [2]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service [2] https://www.youtube.com/watch?v=AbqatHF3tZI Thanks! Felipe |
Hi Felipe, That is an interesting idea to control the upstream's output based on downstream's input. If I understood correctly, the preAggregate operator would trigger flush output while the reduce operator is idle/hungry. In contrast, the preAggregate would continue aggregating data in the case of back pressure. I think this requirement is valid, but unfortunately I guess you can not get the back pressure signal from the operator level. AIK only the upper task level can get the input/output state to decide whether to process or not. If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might rely on some external metric reporter to query it if possible. Best, Zhijiang
|
Hi Zhijiang, thanks for your reply. Yes, you understood correctly. The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" on the operator might be because of the way Flink runtime architecture was designed. But I was wondering what kind of signal I can get. I guess some backpressure message I could get because backpressure works to slow down the upstream operators. For example, I can see the ratio per sub-task on the web interface [1]. It means the physical operators. Is there any message flowing backward that I can get? Is there anything that makes me able to not rely on some external storage? On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <[hidden email]> wrote:
|
Does anyone know in which metric I can rely on to know if a given operator is activating the backpressure? Or how can I call the same java object that the Flink UI calls to give me the ratio of backpressure? Thanks, Felipe On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <[hidden email]> wrote:
|
I don't think there is a truly sane way to do this. I could envision a separate application triggering samples via the REST API, writing the results into kafka which your operator can read. This is probably the most reasonable solution I can come up with. Any attempt at accessing the TaskExecutor or metrics from within the operator are inadvisable; you'd be encroaching into truly hacky territory. You could also do your own backpressure sampling within your
operator (separate thread within the operator executing the same
sampling logic), but I don't know how easy it would be to re-use
Flink code. On 06/11/2019 13:40, Felipe Gutierrez
wrote:
|
If I can trigger the sample via rest API it is good for a POC. Then I can read from any in-memory storage using a separated thread within the operator. But what is the rest api that gives to me the ratio value from backpressure? On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <[hidden email]> wrote:
|
You can refer to this document [1] for the rest API details. Actually the backpreesure uri refers to "/jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether it is easy to get the jobid and vertexid. Best, Zhijiang
|
cool! I got to use it. Now I have to get the jobID and vertice ID inside the operator. I forgot to mention. I am using Flink 1.9.1 On Thu, Nov 7, 2019 at 4:59 AM Zhijiang <[hidden email]> wrote:
|
Hi, We've been dealing with a similar problem of downstream consumers causing backpressure. One idea that a colleague of mine suggested is measuring the time it takes to call Collector[T].out. Since this method is used to push the records downstream, it will also actively block in case the buffer is full and there are no more floating buffers to allocate, hence causing the backpressure. Thus, if you know the average time it takes this function to be invoked when there's no backpressure, you can make an educated guess on the time it takes when there is pressure (you'll need to measure these times in your source/operator), and actively slow down the number of records being pushed downstream. Yuval. On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, <[hidden email]> wrote:
|
humm, that is also another possibility. Thanks for your suggestion! On Thu, Nov 7, 2019 at 10:41 PM Yuval Itzchakov <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |