Insufficient number of network buffers- what does Total mean on the Flink Dashboard

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Insufficient number of network buffers- what does Total mean on the Flink Dashboard

Vijay Balakrishnan
Hi,
Get this error:
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 877118 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


Followed docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

network = Min(max, Max(min, fraction x total)  //what does Total mean - The max JVM heap is used to derive the total memory for the calculation of network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
= Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
Used this in flink-conf.yaml:
    taskmanager.numberOfTaskSlots: 10
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 50gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000
web.refresh-interval:6000

Saw some old calc about buffers
(slots/Tm * slots/TM) * #TMs * 4
=10 * 10 * 47 * 4 = 18,800 buffers.

What am I missing in the network buffer calc ??  

TIA,



Screen Shot 2020-06-11 at 4.37.44 PM.png (231K) Download Attachment
Screen Shot 2020-06-11 at 4.37.17 PM.png (260K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

Xintong Song
Hi Vijay,

The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month.

Regarding your questions,
  • "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons.
  • The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more.
  • The "total memory" referred in network memory calculation is:
    • jvm-heap + network, if managed memory is configured on-heap (default)
      • According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case.
    • jvm-heap + managed + network, if managed memory is configured off-heap
  • The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation.
    • networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction))
    • In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB
One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB
Have you specified a custom "-Xmx" parameter?

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Get this error:
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 877118 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


Followed docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

network = Min(max, Max(min, fraction x total)  //what does Total mean - The max JVM heap is used to derive the total memory for the calculation of network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
= Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
Used this in flink-conf.yaml:
    taskmanager.numberOfTaskSlots: 10
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 50gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000
web.refresh-interval:6000

Saw some old calc about buffers
(slots/Tm * slots/TM) * #TMs * 4
=10 * 10 * 47 * 4 = 18,800 buffers.

What am I missing in the network buffer calc ??  

TIA,


Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

Vijay Balakrishnan
Thx, Xintong for a great answer. Much appreciated.

Max heap: if -Xmx is set then it is its value else ¼ of physical machine memory estimated by the JVM

No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB figure.

On Thu, Jun 11, 2020 at 9:14 PM Xintong Song <[hidden email]> wrote:
Hi Vijay,

The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month.

Regarding your questions,
  • "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons.
  • The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more.
  • The "total memory" referred in network memory calculation is:
    • jvm-heap + network, if managed memory is configured on-heap (default)
      • According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case.
    • jvm-heap + managed + network, if managed memory is configured off-heap
  • The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation.
    • networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction))
    • In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB
One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB
Have you specified a custom "-Xmx" parameter?

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Get this error:
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 877118 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


Followed docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

network = Min(max, Max(min, fraction x total)  //what does Total mean - The max JVM heap is used to derive the total memory for the calculation of network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
= Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
Used this in flink-conf.yaml:
    taskmanager.numberOfTaskSlots: 10
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 50gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000
web.refresh-interval:6000

Saw some old calc about buffers
(slots/Tm * slots/TM) * #TMs * 4
=10 * 10 * 47 * 4 = 18,800 buffers.

What am I missing in the network buffer calc ??  

TIA,


Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

Xintong Song
Flink should have calculated the heap size and set the -Xms, according to the equations I mentioned. So if you haven't set an customized -Xmx that overwrites this, it should not use the default 1.4 of physical memory.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB


Are you running Flink on Mesos? I think Flink has not automatically set -Xmx on Mesos.


BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is much closer to 29GB if we consider there are some rounding errors and accuracy loss.


Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan <[hidden email]> wrote:
Thx, Xintong for a great answer. Much appreciated.

Max heap: if -Xmx is set then it is its value else ¼ of physical machine memory estimated by the JVM

No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB figure.

On Thu, Jun 11, 2020 at 9:14 PM Xintong Song <[hidden email]> wrote:
Hi Vijay,

The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month.

Regarding your questions,
  • "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons.
  • The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more.
  • The "total memory" referred in network memory calculation is:
    • jvm-heap + network, if managed memory is configured on-heap (default)
      • According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case.
    • jvm-heap + managed + network, if managed memory is configured off-heap
  • The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation.
    • networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction))
    • In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB
One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB
Have you specified a custom "-Xmx" parameter?

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Get this error:
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 877118 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


Followed docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

network = Min(max, Max(min, fraction x total)  //what does Total mean - The max JVM heap is used to derive the total memory for the calculation of network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
= Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
Used this in flink-conf.yaml:
    taskmanager.numberOfTaskSlots: 10
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 50gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000
web.refresh-interval:6000

Saw some old calc about buffers
(slots/Tm * slots/TM) * #TMs * 4
=10 * 10 * 47 * 4 = 18,800 buffers.

What am I missing in the network buffer calc ??  

TIA,


Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

Vijay Balakrishnan
Hi Xintong,
Just to be clear. I haven't set any -Xmx -i will check our scripts again.
Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e 29GB will be used.

So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am assuming the heap max of 102Gb will be used in the N/w mem calculation.
Is that the right way to set env.java.opts ??
TIA,
Vijay

On Fri, Jun 12, 2020 at 1:49 AM Xintong Song <[hidden email]> wrote:
Flink should have calculated the heap size and set the -Xms, according to the equations I mentioned. So if you haven't set an customized -Xmx that overwrites this, it should not use the default 1.4 of physical memory.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB


Are you running Flink on Mesos? I think Flink has not automatically set -Xmx on Mesos.


BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is much closer to 29GB if we consider there are some rounding errors and accuracy loss.


Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan <[hidden email]> wrote:
Thx, Xintong for a great answer. Much appreciated.

Max heap: if -Xmx is set then it is its value else ¼ of physical machine memory estimated by the JVM

No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB figure.

On Thu, Jun 11, 2020 at 9:14 PM Xintong Song <[hidden email]> wrote:
Hi Vijay,

The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month.

Regarding your questions,
  • "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons.
  • The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more.
  • The "total memory" referred in network memory calculation is:
    • jvm-heap + network, if managed memory is configured on-heap (default)
      • According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case.
    • jvm-heap + managed + network, if managed memory is configured off-heap
  • The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation.
    • networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction))
    • In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB
One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB
Have you specified a custom "-Xmx" parameter?

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Get this error:
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 877118 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


Followed docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

network = Min(max, Max(min, fraction x total)  //what does Total mean - The max JVM heap is used to derive the total memory for the calculation of network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
= Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
Used this in flink-conf.yaml:
    taskmanager.numberOfTaskSlots: 10
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 50gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000
web.refresh-interval:6000

Saw some old calc about buffers
(slots/Tm * slots/TM) * #TMs * 4
=10 * 10 * 47 * 4 = 18,800 buffers.

What am I missing in the network buffer calc ??  

TIA,


Reply | Threaded
Open this post in threaded view
|

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

Xintong Song
Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e 29GB will be used.

This is true.

So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am assuming the heap max of 102Gb will be used in the N/w mem calculation.
Is that the right way to set env.java.opts ??

I cannot be sure. I just checked, and it seems even for Mesos the "-Xmx" should be set. So technically, Flink should have always set the "-Xmx". If you are using a custom shell script for launching task manager processes, then I cannot tell whether "env.java.opts" works for you.

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 5:33 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi Xintong,
Just to be clear. I haven't set any -Xmx -i will check our scripts again.
Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e 29GB will be used.

So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am assuming the heap max of 102Gb will be used in the N/w mem calculation.
Is that the right way to set env.java.opts ??
TIA,
Vijay

On Fri, Jun 12, 2020 at 1:49 AM Xintong Song <[hidden email]> wrote:
Flink should have calculated the heap size and set the -Xms, according to the equations I mentioned. So if you haven't set an customized -Xmx that overwrites this, it should not use the default 1.4 of physical memory.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB


Are you running Flink on Mesos? I think Flink has not automatically set -Xmx on Mesos.


BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is much closer to 29GB if we consider there are some rounding errors and accuracy loss.


Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan <[hidden email]> wrote:
Thx, Xintong for a great answer. Much appreciated.

Max heap: if -Xmx is set then it is its value else ¼ of physical machine memory estimated by the JVM

No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB figure.

On Thu, Jun 11, 2020 at 9:14 PM Xintong Song <[hidden email]> wrote:
Hi Vijay,

The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month.

Regarding your questions,
  • "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons.
  • The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more.
  • The "total memory" referred in network memory calculation is:
    • jvm-heap + network, if managed memory is configured on-heap (default)
      • According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case.
    • jvm-heap + managed + network, if managed memory is configured off-heap
  • The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation.
    • networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction))
    • In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB
One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap.
  • Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB
  • On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB
Have you specified a custom "-Xmx" parameter?

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Get this error:
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 877118 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


Followed docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

network = Min(max, Max(min, fraction x total)  //what does Total mean - The max JVM heap is used to derive the total memory for the calculation of network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
= Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
Used this in flink-conf.yaml:
    taskmanager.numberOfTaskSlots: 10
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 50gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000
web.refresh-interval:6000

Saw some old calc about buffers
(slots/Tm * slots/TM) * #TMs * 4
=10 * 10 * 47 * 4 = 18,800 buffers.

What am I missing in the network buffer calc ??  

TIA,