Hello again,
I'm facing a problem in my program execution. If I do not define the "numberOfBuffers" variable in the flink-conf.yaml, my program execution fail with the following exception: " Job execution switched to status FAILED Error: The program execution failed: dbis72 has not enough buffers to safely execute CoGroup (org.apache.flink.allIn.PSIJoinJob$CoGroupSignatures) (154 buffers missing) ..... " I tried to set the "numberOfBuffers" to 404800. My program finished without executions (in 535.59 seconds). Next I tried to set the "numberOfBuffers" to 40480. The program execution was much faster: 75.19 seconds Seems like, the number of buffers has a deep impact on the runtime. Is there a way to automatically minimize the number of network buffers for my program. I would welcome your response. Greetings Florian |
Hi Florian, There is this really old picture: https://camo.githubusercontent.com/2f8c7ea5e2b9e25a3e9995c573e5666b6baf920a/68747470733a2f2f662e636c6f75642e6769746875622e636f6d2f6173736574732f3635393536342f313238383836372f31393939366533652d333031382d313165332d393362622d6464366266636134656431652e706e67 that shows how Flink is using memory. When you start the TaskManager JVM, you have to tell the JVM how much memory it gets. Thats the "taskmanager.heap.mb" value. Lets say you have 20 GB there. So your Flink TaskManager has a total of 20GB at hand, that we can use. From that memory, you have to subtract a) The memory for the network buffers. That is taskmanager.network.numberOfBuffers * taskmanager.network.bufferSizeInBytes. b) the memory that we use for data processing (sorting, joins, iterations). This amount of memory is determined by the "taskmanager.memory.fraction". By default, its 0.7. So we use 70% of the memory that is free AFTER a) was allocated. In your case, you had two numbers for the numberOfBuffers: numberOfBuffers * bufferSizeInBytes (= 32768) 404800 * 32768 ==> 13.2645 GB 40480 * 32768 ==> 1.32645 GB So you had 13 GB of memory allocated only for network buffers. So only 7 GB were left for data processing (-30%) ==> 4.9 GB. In the other case, you had 18.68 GB left, so in the end you had 13 GB for data processing. Flink is using as much memory as possible to process the data in-memory. But if not so much memory is available, we have to go to disk earlier. "Is there a way to automatically minimize the number of network buffers for my program." Not yet ;) We are working on making the memory assignment dynamic so that users don't have to specify a fixed amount of network buffers in the beginning. I think Ufuk can send you the secret formula to calculate the number of buffers required ;) Best, Robert On Fri, Oct 10, 2014 at 5:05 PM, Florian Hönicke <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |