optimal number of buffers

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

optimal number of buffers

Rockstar Flo
Hello again,

I'm facing a problem in my program execution.
If I do not define the "numberOfBuffers" variable in the flink-conf.yaml, my program execution fail with the following exception:
"
Job execution switched to status FAILED
Error: The program execution failed: dbis72 has not enough buffers to safely execute CoGroup (org.apache.flink.allIn.PSIJoinJob$CoGroupSignatures) (154 buffers missing)
.....
"
I tried to set the "numberOfBuffers" to  404800.
My program finished without executions (in 535.59 seconds).
Next I tried to set the "numberOfBuffers" to  40480.
The program execution was much faster: 75.19 seconds
Seems like, the number of buffers has a deep impact on the runtime.

Is there a way to automatically minimize the number of network buffers for my program.

I would welcome your response.
Greetings Florian


Reply | Threaded
Open this post in threaded view
|

Re: optimal number of buffers

rmetzger0
Hi Florian,


When you start the TaskManager JVM, you have to tell the JVM how much memory it gets.
Thats the "taskmanager.heap.mb" value. Lets say you have 20 GB there.
So your Flink TaskManager has a total of 20GB at hand, that we can use.
From that memory, you have to subtract
a) The memory for the network buffers. That is taskmanager.network.numberOfBuffers * taskmanager.network.bufferSizeInBytes.
b) the memory that we use for data processing (sorting, joins, iterations). This amount of memory is determined by the "taskmanager.memory.fraction". By default, its 0.7. So we use 70% of the memory that is free AFTER a) was allocated.

In your case, you had two numbers for the numberOfBuffers:
numberOfBuffers * bufferSizeInBytes (= 32768)
404800 * 32768 ==> 13.2645 GB
  40480 * 32768 ==> 1.32645 GB

So you had 13 GB of memory allocated only for network buffers. So only 7 GB were left for data processing (-30%) ==> 4.9 GB.
In the other case, you had 18.68 GB left, so in the end you had 13 GB for data processing.

Flink is using as much memory as possible to process the data in-memory. But if not so much memory is available, we have to go to disk earlier.

"Is there a way to automatically minimize the number of network buffers for my program."
Not yet ;) We are working on making the memory assignment dynamic so that users don't have to specify a fixed amount of network buffers in the beginning.

I think Ufuk can send you the secret formula to calculate the number of buffers required ;)

Best,
Robert



On Fri, Oct 10, 2014 at 5:05 PM, Florian Hönicke <[hidden email]> wrote:
Hello again,

I'm facing a problem in my program execution.
If I do not define the "numberOfBuffers" variable in the flink-conf.yaml, my program execution fail with the following exception:
"
Job execution switched to status FAILED
Error: The program execution failed: dbis72 has not enough buffers to safely execute CoGroup (org.apache.flink.allIn.PSIJoinJob$CoGroupSignatures) (154 buffers missing)
.....
"
I tried to set the "numberOfBuffers" to  404800.
My program finished without executions (in 535.59 seconds).
Next I tried to set the "numberOfBuffers" to  40480.
The program execution was much faster: 75.19 seconds
Seems like, the number of buffers has a deep impact on the runtime.

Is there a way to automatically minimize the number of network buffers for my program.

I would welcome your response.
Greetings Florian