Setting taskmanager.network.numberOfBuffers and getting errors...

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting taskmanager.network.numberOfBuffers and getting errors...

Sourigna Phetsarath
All:

I'm running a Flink 0.10.2 App by submitting to YARN as an application.  I'm using an AWS EMR cluster of 1 Master and 10 d2.8xlarge.  When I submit the job using:


bin/flink run \
    -m yarn-cluster \
    -yjm 20480 \
    -yn 10 \
    -ytm 80960 \
    -ys 36 \
    -yD taskmanager.network.numberOfBuffers=51840 \
...

I'm seeing this error:

Caused by: java.io.IOException: Insufficient number of network buffers: required 360, but only 315 available. The total number of network buffers is currently set to 51840. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'.

at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)

at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)

at java.lang.Thread.run(Thread.java:745)


The error message does not seem to be conveying the correct information.

Can someone explain to me, what are reasonable numbers to use for taskmanager.network.numberOfBuffers and taskmanager.network.bufferSizeInBytes


But I am still unclear of the calculus is it supposed to be?

 #cores ^ 2 * #machines * 4 

So, in my case  36 ^ 2 * 10 * 4 = 51840

Thanks in advance for you help that you can provide.

--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna


Reply | Threaded
Open this post in threaded view
|

Re: Setting taskmanager.network.numberOfBuffers and getting errors...

Fabian Hueske-2
Hi Sourigna,

you are using the formula correctly: #cores should to be translated into slots per taskmanager (TM), and #machines into number of TMs. So 36 ^ 2 * 10 * 4 = 51840 appears to be right.
The constant 4 refers to the total number of concurrently active full network shuffles (partitioning or broadcasting). If your job is more complex, e.g., it has several inputs which are joined, reduced, etc, the constant needs to be adapted accordingly.

The high number of network buffers is due to Flink's pipelined data exchange. Producing sender tasks send records to receiving tasks while the data produced. Pipelining can significantly improve the performance of jobs, but for high parallelism it requires quite a bit of memory. Hadoop and Spark use a different technique to ship data. They collect data on the sender and ship it in batches to the receiver. This technique is less memory intensive but has a higher latency. Flink does also support batched data exchange. If you do not want to allocate so much memory for pipelined shuffles, you can activate batched data exchanges by calling:

ExecutionEnvironment env = ...
env.getConfig().setExecutionMode(ExecutionMode.BATCH);

Best,
Fabian

2016-03-03 22:27 GMT+01:00 Sourigna Phetsarath <[hidden email]>:
All:

I'm running a Flink 0.10.2 App by submitting to YARN as an application.  I'm using an AWS EMR cluster of 1 Master and 10 d2.8xlarge.  When I submit the job using:


bin/flink run \
    -m yarn-cluster \
    -yjm 20480 \
    -yn 10 \
    -ytm 80960 \
    -ys 36 \
    -yD taskmanager.network.numberOfBuffers=51840 \
...

I'm seeing this error:

Caused by: java.io.IOException: Insufficient number of network buffers: required 360, but only 315 available. The total number of network buffers is currently set to 51840. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'.

at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)

at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)

at java.lang.Thread.run(Thread.java:745)


The error message does not seem to be conveying the correct information.

Can someone explain to me, what are reasonable numbers to use for taskmanager.network.numberOfBuffers and taskmanager.network.bufferSizeInBytes


But I am still unclear of the calculus is it supposed to be?

 #cores ^ 2 * #machines * 4 

So, in my case  36 ^ 2 * 10 * 4 = 51840

Thanks in advance for you help that you can provide.

--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: <a href="tel:212.402.4871" value="+12124024871" target="_blank">212.402.4871 // m: <a href="tel:917.373.7363" value="+19173737363" target="_blank">917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna