Insufficient number of network buffers - rerun with min amount of required network memory

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Insufficient number of network buffers - rerun with min amount of required network memory

abelm
Hello!

I have a program which creates and runs a local Flink 1.12 environment. I
understand that, based on factors such as parallelism and the presence of
any process functions, the "Insufficient number of network buffers"
exception might pop up.

My plan is to catch this exception inside the main program and restart the
job with a new config in which 'taskmanager.memory.network.min' and
'taskmanager.memory.network.max' are set to a an equal value, greater than
that in the previous run, which would allow the pipeline to run
successfully.

My question is whether there exists a way to find out the exact min value of
bytes which should be put as the value for both the previously mentioned min
and max keys from the error message (specifically, the part stating:
"required [...], but only [...] available. The total number of network
buffers is currently set to [...] of [...] bytes each").

Should it not be possible to use the info from the error message in a
formula to determine this value, I'd probably just end up restarting the job
as many times as necessary instead of at most once and double the value of
'taskmanager.memory.network.min' and 'taskmanager.memory.network.max' for
every consecutive retry.

Apologies if this has been answered before. I have seen a fair few questions
regarding this error before, but none of them seemed to ask about this
specifically.

Thanks in advance for any help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers - rerun with min amount of required network memory

Arvid Heise-3
Hi Abelm,

you can calculate the required number of buffers as follows:
- You need to know the number of network connections which depend on the network shuffles (ignoring rescaling and forward channels here, but they require less buffers).
- If you have s slots per taskmanager and parallelism p, you need p*s network connections.
- Each connection needs 4 buffers (2 input and 2 output).

So if we ignore forward channels (results in overestimation), you need buffersPerTaskManager = 4 * slots * parallelism * shuffles. By default each buffer is 32kb, so you need 128kb * slots * parallelism in taskmanager.memory.network.min/max.

On Wed, Jan 13, 2021 at 3:34 PM abelm <[hidden email]> wrote:
Hello!

I have a program which creates and runs a local Flink 1.12 environment. I
understand that, based on factors such as parallelism and the presence of
any process functions, the "Insufficient number of network buffers"
exception might pop up.

My plan is to catch this exception inside the main program and restart the
job with a new config in which 'taskmanager.memory.network.min' and
'taskmanager.memory.network.max' are set to a an equal value, greater than
that in the previous run, which would allow the pipeline to run
successfully.

My question is whether there exists a way to find out the exact min value of
bytes which should be put as the value for both the previously mentioned min
and max keys from the error message (specifically, the part stating:
"required [...], but only [...] available. The total number of network
buffers is currently set to [...] of [...] bytes each").

Should it not be possible to use the info from the error message in a
formula to determine this value, I'd probably just end up restarting the job
as many times as necessary instead of at most once and double the value of
'taskmanager.memory.network.min' and 'taskmanager.memory.network.max' for
every consecutive retry.

Apologies if this has been answered before. I have seen a fair few questions
regarding this error before, but none of them seemed to ask about this
specifically.

Thanks in advance for any help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers - rerun with min amount of required network memory

abelm
Hello Arvid Heise,

Thanks for replying! Based on your suggestion, I put together the following
snippet for the config:

  val config = new Configuration()

  private val newMemorySize =
config.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE)
    .multiply(4)
    .multiply(config.get(TaskManagerOptions.NUM_TASK_SLOTS).toDouble)
    .multiply(parallelism.toDouble)
  config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, newMemorySize)
  config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, newMemorySize)

It seems to work as intended even with high parallelism, as long as process
functions are not involved. As soon as that is no longer the case, the value
resulting from the formula is too low, regardless of parallelism. How would
one go about accounting for that?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/