Hello,
Next issue in a string of things I'm solving is that my application fails with the message 'Connection unexpectedly closed by remote task manager'.
Yarn log shows the following: Container [pid=4102,containerID=container_1461341357870_0004_01_000015] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing container. Dump of the process-tree for container_1461341357870_0004_01_000015 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m -XX:MaxDirectMemorySize=1900m -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.out 2> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.err |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460 /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m -XX:MaxDirectMemorySize=1900m -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . One thing that drew my attention is `-Xmx570m`. I expected it to be TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the application as follows: HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm 4096 -ytm 2500 eval-assembly-1.0.jar In flink logs I do see 'Task Manager memory: 2500'. When I look at the yarn container logs on the cluster node I see that it starts with 570mb, which puzzles me. When I look at the actually allocated memory for a Yarn container using 'top' I see 2.2GB used. Am I interpreting these parameters correctly? I also have set (it failed in the same way without this as well): taskmanager.memory.off-heap: true Also, I don't understand why this happens at all. I assumed that Flink won't overcommit allocated resources and will spill to the disk when running out of heap memory. Appreciate if someone can shed light on this too. Thanks, Timur |
Hi Timur, The reason why we only allocate 570mb for the heap is because you are allocating most of the memory as off heap (direct byte buffers). In theory, the memory footprint of the JVM is limited to 570 (heap) + 1900 (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM is allocating more memory, causing these killings by YARN. I have to check the code of Flink again, because I would expect the safety boundary to be much larger than 30 mb. Regards, Robert On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov <[hidden email]> wrote:
|
Hi Timur,
Which version of Flink are you using? Could you share the entire logs? Thanks, Max On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger <[hidden email]> wrote: > Hi Timur, > > The reason why we only allocate 570mb for the heap is because you are > allocating most of the memory as off heap (direct byte buffers). > > In theory, the memory footprint of the JVM is limited to 570 (heap) + 1900 > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM is > allocating more memory, causing these killings by YARN. > > I have to check the code of Flink again, because I would expect the safety > boundary to be much larger than 30 mb. > > Regards, > Robert > > > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov <[hidden email]> > wrote: >> >> Hello, >> >> Next issue in a string of things I'm solving is that my application fails >> with the message 'Connection unexpectedly closed by remote task manager'. >> >> Yarn log shows the following: >> >> Container [pid=4102,containerID=container_1461341357870_0004_01_000015] is >> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB >> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing >> container. >> Dump of the process-tree for container_1461341357870_0004_01_000015 : >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE >> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m >> -XX:MaxDirectMemorySize=1900m >> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log >> -Dlogback.configurationFile=file:logback.xml >> -Dlog4j.configuration=file:log4j.properties >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> >> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.out >> 2> >> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.err >> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460 >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m >> -XX:MaxDirectMemorySize=1900m >> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log >> -Dlogback.configurationFile=file:logback.xml >> -Dlog4j.configuration=file:log4j.properties >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . >> >> One thing that drew my attention is `-Xmx570m`. I expected it to be >> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the >> application as follows: >> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm >> 4096 -ytm 2500 eval-assembly-1.0.jar >> >> In flink logs I do see 'Task Manager memory: 2500'. When I look at the >> yarn container logs on the cluster node I see that it starts with 570mb, >> which puzzles me. When I look at the actually allocated memory for a Yarn >> container using 'top' I see 2.2GB used. Am I interpreting these parameters >> correctly? >> >> I also have set (it failed in the same way without this as well): >> taskmanager.memory.off-heap: true >> >> Also, I don't understand why this happens at all. I assumed that Flink >> won't overcommit allocated resources and will spill to the disk when running >> out of heap memory. Appreciate if someone can shed light on this too. >> >> Thanks, >> Timur > > |
Hello Maximilian, I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running this on EMR. I didn't see any exceptions in other logs. What are the logs you are interested in? Thanks, Timur On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels <[hidden email]> wrote: Hi Timur, |
Hi Timur,
Shedding some light on the memory calculation: You have a total memory size of 2500 MB for each TaskManager. The default for 'taskmanager.memory.fraction' is 0.7. This is the fraction of the memory used by the memory manager. When you have turned on off-heap memory, this memory is allocated off-heap. As you pointed out, the default Yarn cutoff ratio is 0.25. Memory cutoff for Yarn: 2500 * 0.25 MB = 625 MB Java heap size with off-heap disabled: 2500 MB - 625 MB = 1875 MB Java heap size with off-heap enabled: (2500 MB - 625 MB) * 0.3 = 562,5 MB (~570 MB in your case) Off-heap memory size: (2500 MB - 625 MB) * 0.7 = 1312,5 MB The heap memory limits in your log seem to be calculated correctly. Note that we don't set a strict limit for the off-heap memory because the Flink memory manager controls the amount of memory allocated. It will preallocate memory when you have 'taskmanager.memory.preallocate' set to true. Otherwise it will allocate dynamically. Still, you should have about 500 MB memory left with everything allocated. There is some more direct (off-heap) memory allocated for the network stack adjustable with 'taskmanager.network.numberOfBuffers' which is set to 2048 by default and corresponds to 2048 * 32 KB = 64 MB memory. I believe this can grow up to twice of that size. Still, should be enough memory left. Are you running a streaming or batch job? Off-heap memory and memory preallocation are mostly beneficial for batch jobs which use the memory manager a lot for sorting, hashing and caching. For streaming I'd suggest to use Flink's defaults: taskmanager.memory.off-heap: false taskmanager.memory.preallocate: false Raising the cutoff ratio should prevent killing of the TaskManagers. As Robert mentioned, in practice the JVM tends to allocate more than the maximum specified heap size. You can put the following in your flink-conf.yaml: # slightly raise the cut off ratio (might need to be even higher) yarn.heap-cutoff-ratio: 0.3 Thanks, Max On Mon, Apr 25, 2016 at 5:52 PM, Timur Fayruzov <[hidden email]> wrote: > Hello Maximilian, > > I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running this on > EMR. I didn't see any exceptions in other logs. What are the logs you are > interested in? > > Thanks, > Timur > > On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Timur, >> >> Which version of Flink are you using? Could you share the entire logs? >> >> Thanks, >> Max >> >> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger <[hidden email]> >> wrote: >> > Hi Timur, >> > >> > The reason why we only allocate 570mb for the heap is because you are >> > allocating most of the memory as off heap (direct byte buffers). >> > >> > In theory, the memory footprint of the JVM is limited to 570 (heap) + >> > 1900 >> > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM >> > is >> > allocating more memory, causing these killings by YARN. >> > >> > I have to check the code of Flink again, because I would expect the >> > safety >> > boundary to be much larger than 30 mb. >> > >> > Regards, >> > Robert >> > >> > >> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov >> > <[hidden email]> >> > wrote: >> >> >> >> Hello, >> >> >> >> Next issue in a string of things I'm solving is that my application >> >> fails >> >> with the message 'Connection unexpectedly closed by remote task >> >> manager'. >> >> >> >> Yarn log shows the following: >> >> >> >> Container [pid=4102,containerID=container_1461341357870_0004_01_000015] >> >> is >> >> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB >> >> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing >> >> container. >> >> Dump of the process-tree for container_1461341357870_0004_01_000015 : >> >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) >> >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE >> >> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c >> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m >> >> -XX:MaxDirectMemorySize=1900m >> >> >> >> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log >> >> -Dlogback.configurationFile=file:logback.xml >> >> -Dlog4j.configuration=file:log4j.properties >> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> >> >> >> >> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.out >> >> 2> >> >> >> >> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.err >> >> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460 >> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m >> >> -XX:MaxDirectMemorySize=1900m >> >> >> >> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log >> >> -Dlogback.configurationFile=file:logback.xml >> >> -Dlog4j.configuration=file:log4j.properties >> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . >> >> >> >> One thing that drew my attention is `-Xmx570m`. I expected it to be >> >> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the >> >> application as follows: >> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm >> >> 4096 -ytm 2500 eval-assembly-1.0.jar >> >> >> >> In flink logs I do see 'Task Manager memory: 2500'. When I look at the >> >> yarn container logs on the cluster node I see that it starts with >> >> 570mb, >> >> which puzzles me. When I look at the actually allocated memory for a >> >> Yarn >> >> container using 'top' I see 2.2GB used. Am I interpreting these >> >> parameters >> >> correctly? >> >> >> >> I also have set (it failed in the same way without this as well): >> >> taskmanager.memory.off-heap: true >> >> >> >> Also, I don't understand why this happens at all. I assumed that Flink >> >> won't overcommit allocated resources and will spill to the disk when >> >> running >> >> out of heap memory. Appreciate if someone can shed light on this too. >> >> >> >> Thanks, >> >> Timur >> > >> > > > |
Great answer, thanks you Max for a very detailed explanation! Illuminating how off-heap parameter affects the memory allocation. I read this post: https://blogs.oracle.com/jrockit/entry/why_is_my_jvm_process_larger_t and the thing that jumped on me is the allocation of memory for jni libs. I do use a native library in my application, which is likely the culprit. I need to account for its memory footprint when doing my memory calculations. Thanks, Timur On Mon, Apr 25, 2016 at 10:28 AM, Maximilian Michels <[hidden email]> wrote: Hi Timur, |
Hi Timur,
Indeed, if you use JNI libraries then the memory will be off-heap and the -XmX limit will not be respected. Currently, we don't expect users to use JNI memory allocation. We might want to enforce a more strict direct memory limit in the future. In this case, you would get an OutOfMemoryException before Yarn could kill the container. Both are not nice to have :) You will have to adjust 'yarn.heap-cutoff-ratio' or 'yarn.heap-cutoff-min' (for an absolute memory cutoff) to adjust to your JNI memory needs. Cheers, Max On Mon, Apr 25, 2016 at 8:27 PM, Timur Fayruzov <[hidden email]> wrote: > Great answer, thanks you Max for a very detailed explanation! Illuminating > how off-heap parameter affects the memory allocation. > > I read this post: > https://blogs.oracle.com/jrockit/entry/why_is_my_jvm_process_larger_t > > and the thing that jumped on me is the allocation of memory for jni libs. I > do use a native library in my application, which is likely the culprit. I > need to account for its memory footprint when doing my memory calculations. > > Thanks, > Timur > > > On Mon, Apr 25, 2016 at 10:28 AM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Timur, >> >> Shedding some light on the memory calculation: >> >> You have a total memory size of 2500 MB for each TaskManager. The >> default for 'taskmanager.memory.fraction' is 0.7. This is the fraction >> of the memory used by the memory manager. When you have turned on >> off-heap memory, this memory is allocated off-heap. As you pointed >> out, the default Yarn cutoff ratio is 0.25. >> >> Memory cutoff for Yarn: 2500 * 0.25 MB = 625 MB >> >> Java heap size with off-heap disabled: 2500 MB - 625 MB = 1875 MB >> >> Java heap size with off-heap enabled: (2500 MB - 625 MB) * 0.3 = 562,5 >> MB (~570 MB in your case) >> Off-heap memory size: (2500 MB - 625 MB) * 0.7 = 1312,5 MB >> >> The heap memory limits in your log seem to be calculated correctly. >> Note that we don't set a strict limit for the off-heap memory because >> the Flink memory manager controls the amount of memory allocated. It >> will preallocate memory when you have 'taskmanager.memory.preallocate' >> set to true. Otherwise it will allocate dynamically. Still, you should >> have about 500 MB memory left with everything allocated. There is some >> more direct (off-heap) memory allocated for the network stack >> adjustable with 'taskmanager.network.numberOfBuffers' which is set to >> 2048 by default and corresponds to 2048 * 32 KB = 64 MB memory. I >> believe this can grow up to twice of that size. Still, should be >> enough memory left. >> >> Are you running a streaming or batch job? Off-heap memory and memory >> preallocation are mostly beneficial for batch jobs which use the >> memory manager a lot for sorting, hashing and caching. >> >> For streaming I'd suggest to use Flink's defaults: >> >> taskmanager.memory.off-heap: false >> taskmanager.memory.preallocate: false >> >> Raising the cutoff ratio should prevent killing of the TaskManagers. >> As Robert mentioned, in practice the JVM tends to allocate more than >> the maximum specified heap size. You can put the following in your >> flink-conf.yaml: >> >> # slightly raise the cut off ratio (might need to be even higher) >> yarn.heap-cutoff-ratio: 0.3 >> >> Thanks, >> Max >> >> On Mon, Apr 25, 2016 at 5:52 PM, Timur Fayruzov >> <[hidden email]> wrote: >> > Hello Maximilian, >> > >> > I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running >> > this on >> > EMR. I didn't see any exceptions in other logs. What are the logs you >> > are >> > interested in? >> > >> > Thanks, >> > Timur >> > >> > On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels <[hidden email]> >> > wrote: >> >> >> >> Hi Timur, >> >> >> >> Which version of Flink are you using? Could you share the entire logs? >> >> >> >> Thanks, >> >> Max >> >> >> >> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger <[hidden email]> >> >> wrote: >> >> > Hi Timur, >> >> > >> >> > The reason why we only allocate 570mb for the heap is because you are >> >> > allocating most of the memory as off heap (direct byte buffers). >> >> > >> >> > In theory, the memory footprint of the JVM is limited to 570 (heap) + >> >> > 1900 >> >> > (direct mem) = 2470 MB (which is below 2500). But in practice thje >> >> > JVM >> >> > is >> >> > allocating more memory, causing these killings by YARN. >> >> > >> >> > I have to check the code of Flink again, because I would expect the >> >> > safety >> >> > boundary to be much larger than 30 mb. >> >> > >> >> > Regards, >> >> > Robert >> >> > >> >> > >> >> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov >> >> > <[hidden email]> >> >> > wrote: >> >> >> >> >> >> Hello, >> >> >> >> >> >> Next issue in a string of things I'm solving is that my application >> >> >> fails >> >> >> with the message 'Connection unexpectedly closed by remote task >> >> >> manager'. >> >> >> >> >> >> Yarn log shows the following: >> >> >> >> >> >> Container >> >> >> [pid=4102,containerID=container_1461341357870_0004_01_000015] >> >> >> is >> >> >> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 >> >> >> GB >> >> >> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing >> >> >> container. >> >> >> Dump of the process-tree for container_1461341357870_0004_01_000015 >> >> >> : >> >> >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) >> >> >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) >> >> >> FULL_CMD_LINE >> >> >> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c >> >> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m >> >> >> -XX:MaxDirectMemorySize=1900m >> >> >> >> >> >> >> >> >> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log >> >> >> -Dlogback.configurationFile=file:logback.xml >> >> >> -Dlog4j.configuration=file:log4j.properties >> >> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> >> >> >> >> >> >> >> >> >> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.out >> >> >> 2> >> >> >> >> >> >> >> >> >> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.err >> >> >> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460 >> >> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m >> >> >> -XX:MaxDirectMemorySize=1900m >> >> >> >> >> >> >> >> >> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_000015/taskmanager.log >> >> >> -Dlogback.configurationFile=file:logback.xml >> >> >> -Dlog4j.configuration=file:log4j.properties >> >> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . >> >> >> >> >> >> One thing that drew my attention is `-Xmx570m`. I expected it to be >> >> >> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the >> >> >> application as follows: >> >> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 >> >> >> -yjm >> >> >> 4096 -ytm 2500 eval-assembly-1.0.jar >> >> >> >> >> >> In flink logs I do see 'Task Manager memory: 2500'. When I look at >> >> >> the >> >> >> yarn container logs on the cluster node I see that it starts with >> >> >> 570mb, >> >> >> which puzzles me. When I look at the actually allocated memory for a >> >> >> Yarn >> >> >> container using 'top' I see 2.2GB used. Am I interpreting these >> >> >> parameters >> >> >> correctly? >> >> >> >> >> >> I also have set (it failed in the same way without this as well): >> >> >> taskmanager.memory.off-heap: true >> >> >> >> >> >> Also, I don't understand why this happens at all. I assumed that >> >> >> Flink >> >> >> won't overcommit allocated resources and will spill to the disk when >> >> >> running >> >> >> out of heap memory. Appreciate if someone can shed light on this >> >> >> too. >> >> >> >> >> >> Thanks, >> >> >> Timur >> >> > >> >> > >> > >> > > > |
Free forum by Nabble | Edit this page |