Hello!
I have a program which creates and runs a local Flink 1.12 environment. I understand that, based on factors such as parallelism and the presence of any process functions, the "Insufficient number of network buffers" exception might pop up. My plan is to catch this exception inside the main program and restart the job with a new config in which 'taskmanager.memory.network.min' and 'taskmanager.memory.network.max' are set to a an equal value, greater than that in the previous run, which would allow the pipeline to run successfully. My question is whether there exists a way to find out the exact min value of bytes which should be put as the value for both the previously mentioned min and max keys from the error message (specifically, the part stating: "required [...], but only [...] available. The total number of network buffers is currently set to [...] of [...] bytes each"). Should it not be possible to use the info from the error message in a formula to determine this value, I'd probably just end up restarting the job as many times as necessary instead of at most once and double the value of 'taskmanager.memory.network.min' and 'taskmanager.memory.network.max' for every consecutive retry. Apologies if this has been answered before. I have seen a fair few questions regarding this error before, but none of them seemed to ask about this specifically. Thanks in advance for any help! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Abelm, you can calculate the required number of buffers as follows: - You need to know the number of network connections which depend on the network shuffles (ignoring rescaling and forward channels here, but they require less buffers). - If you have s slots per taskmanager and parallelism p, you need p*s network connections. - Each connection needs 4 buffers (2 input and 2 output). So if we ignore forward channels (results in overestimation), you need buffersPerTaskManager = 4 * slots * parallelism * shuffles. By default each buffer is 32kb, so you need 128kb * slots * parallelism in taskmanager.memory.network.min/max. On Wed, Jan 13, 2021 at 3:34 PM abelm <[hidden email]> wrote: Hello! -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hello Arvid Heise,
Thanks for replying! Based on your suggestion, I put together the following snippet for the config: val config = new Configuration() private val newMemorySize = config.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE) .multiply(4) .multiply(config.get(TaskManagerOptions.NUM_TASK_SLOTS).toDouble) .multiply(parallelism.toDouble) config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, newMemorySize) config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, newMemorySize) It seems to work as intended even with high parallelism, as long as process functions are not involved. As soon as that is no longer the case, the value resulting from the formula is too low, regardless of parallelism. How would one go about accounting for that? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |