Hi experts,
I am running flink 1.10, the flink job is stateless. I am trying to understand how local buffer pool works: 1. lets say taskA and taskB both run in the same TM JVM, each task will have its own local buffer pool, and taskA will write to pool-A, and taskB will read from pool-A and write to pool-b, if taskB consume slower from pool-A than taskA writes to it, it will cause backpressure. 2. If the above assumption is correct, then this works when taskA and taskB is not chained together, if chained, there is no buffer in between, the StreamRecord will be directly passed from taskA to taskB? 3. what is the configuration parameter for this local buffer pool? and what is the relationship between local buffer pool with network buffer pool? 4. is the configuration for the total local buffer per TM? and is it evenly spread between tasks? Thanks a lot! Eleanore |
Hi Eleanore, first I'd like to point to a related blog post, which explains most concepts in a better way than I could write here [1]. Now let's go through your questions: 1. A buffer pool is just a place where a task gets a buffer from. So pool-A is used by taskA for both reading (from network stack) and writing to network stack. Now you have the special case of taskA and B being co-located. In this case (and only in this case), the buffer of pool-A is handed over to taskB for immediate consumption. Back-pressure in general occurs when B consumes slower than A. For distributed tasks that means that B is not freeing buffers fast enough to read from the network. There are two reasons: processing is slower than A produces (buffer stuck in network input) or B itself is backpressured (buffers stuck in output). For co-located tasks, it's pretty much the same except that B is not using a buffer to read data from its pool, so buffers of pool-A are stuck in network input. 2. Correct. Chained tasks do not use buffers at all. Chained tasks should always be preferred. Note that if you have not enabled object reuse, the StreamRecord would be cloned to ensure data integrity. You should enable object reuse [2] for even better performance. 3. Network buffer pool is created per task manager according to the memory settings [2]. The network buffer pool then redistributes its buffers to the local pools of tasks (there are also exclusive buffers for inputs that live outside of local pools). Related options are taskmanager.memory.network.fraction (= total size of network buffer pool) and taskmanager.memory.segment-size (determines number of buffers) as well as taskmanager.network.memory.buffers-per-channel (exclusive buffers), taskmanager.network.memory.floating-buffers-per-gate (additional buffers in local pool per input gate/result partition), and taskmanager.network.memory.max-buffers-per-channel (max buffers per outgoing channel in case of data skew). 4. Yes, indirectly through (effective value of taskmanager.memory.network.fraction) / taskmanager.memory.segment-size. Yes, it's distributed evenly afaik. On Wed, Jan 6, 2021 at 7:25 AM Eleanore Jin <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |