This post was updated on .
Hi everyone,
I'm constantly running into OutOfMemoryErrors and for the life of me I cannot figure out what's wrong. Let me describe my setup. I'm running the current master branch of Flink on YARN (Hadoop 2.7.0). My job is an unfinished implementation of TPC-H Q2 ( https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java), I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per machine. This is what I believe to be the relevant section of my yarn_site.xml: <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>57344</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>55296</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> And this is how I submit the job: $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 ..... The TMs happily report: ..... 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - JVM Options: 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - -Xms24511m 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - -Xmx24511m 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - -XX:MaxDirectMemorySize=65m ..... I've tried various combinations of YARN and Flink options, to no avail. I always end up with the following stacktrace: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) at java.lang.Thread.run(Thread.java:745) Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) ... 9 more Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:658) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) at io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) at io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157) at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) ... 10 more I always figured that running into OOMEs with Flink would be quite hard to achieve, however I'm wondering what's going wrong now. Seems to be related to the Direct Memory? Why are you limiting it in the JVM options at all? Is there a special place where I can safely increase the size / remove the option altogether for unboundedness? A note on the data sizes, I used a scaling factor 1000 for the dbgen command of TPC-H, which effectively means the following. Each table is split in 7 chunks (one local to each TM), each chunk of the part.tbl is 734M, each chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These are not excessive amounts of data, however the query (at least my implementation) involves joins (the one in line 249 causing the OOME) and maybe there are some network issues? Maybe you can point me into the right direction, thanks a bunch. Cheers. Robert |
Hi Robert,
This is a regression on the current master due to changes in the way Flink calculates the memory and sets the maximum direct memory size. We introduced these changes when we merged support for off-heap memory. This is not a problem in the way Flink deals with managed memory, just -XX:MaxDirectMemorySize is set too low. By default the maximum direct memory is only used by the network stack. The network library we use, allocates more direct memory than we expected. We'll push a fix to the master as soon as possible. Thank you for reporting and thanks for your patience. Best regards, Max On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke <[hidden email]> wrote: > Hi everyone, > > I'm constantly running into OutOfMemoryErrors and for the life of me I > cannot figure out what's wrong. Let me describe my setup. I'm running the > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an > unfinished implementation of TPC-H Q2 > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java), > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per > machine. This is what I believe to be the relevant section of my > yarn_site.xml: > > > <property> > <name>yarn.nodemanager.resource.memory-mb</name> > <value>57344</value> > </property> > <!-- > <property> > <name>yarn.scheduler.minimum-allocation-mb</name> > <value>8192</value> > </property> > --> > <property> > <name>yarn.scheduler.maximum-allocation-mb</name> > <value>55296</value> > </property> > > <property> > <name>yarn.nodemanager.vmem-check-enabled</name> > <value>false</value> > </property> > > > And this is how I submit the job: > > > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 ..... > > > The TMs happily report: > > ..... > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > - JVM Options: > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > - -Xms24511m > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > - -Xmx24511m > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > - -XX:MaxDirectMemorySize=65m > ..... > > > I've tried various combinations of YARN and Flink options, to no avail. I > always end up with the following stacktrace: > > > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) > at > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) > at > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) > at > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) > at > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) > at > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) > at java.lang.Thread.run(Thread.java:745) > Caused by: io.netty.handler.codec.DecoderException: > java.lang.OutOfMemoryError: Direct buffer memory > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > ... 9 more > Caused by: java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:658) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > at > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) > at > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157) > at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) > at > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) > ... 10 more > > > I always figured that running into OOMEs with Flink would be quite hard to > achieve, however I'm wondering what's going wrong now. Seems to be related > to the Direct Memory? Why are you limiting it in the JVM options at all? Is > there a special place where I can safely increase the size / remove the > option altogether for unboundedness? > > A note on the data sizes, I used a scaling factor 1000 for the dbgen command > of TPC-H, which effectively means the following. Each table is split in 7 > chunks (one local to each TM), each chunk of the part.tbl is 734M, each > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These are > not excessive amounts of data, however the query (at least my > implementation) involves joins (the one in line 249 causing the OOME) and > maybe there are some network issues? > > Maybe you can point me into the right direction, thanks a bunch. Cheers. > > Robert |
In reply to this post by Robert Schmidtke
Hi Max,
thanks for your quick reply. I found the relevant code and commented it out for testing, seems to be working. Happily waiting for the fix. Thanks again. Robert On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]> wrote: Hi Robert, My GPG Key ID: 336E2680 |
Hi Robert,
Just a quick update: The issue has been resolved in the latest Maven 0.10-SNAPSHOT dependency. Cheers, Max On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke <[hidden email]> wrote: > Hi Max, > > thanks for your quick reply. I found the relevant code and commented it out > for testing, seems to be working. Happily waiting for the fix. Thanks again. > > Robert > > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Robert, >> >> This is a regression on the current master due to changes in the way >> Flink calculates the memory and sets the maximum direct memory size. >> We introduced these changes when we merged support for off-heap >> memory. This is not a problem in the way Flink deals with managed >> memory, just -XX:MaxDirectMemorySize is set too low. By default the >> maximum direct memory is only used by the network stack. The network >> library we use, allocates more direct memory than we expected. >> >> We'll push a fix to the master as soon as possible. Thank you for >> reporting and thanks for your patience. >> >> Best regards, >> Max >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke >> <[hidden email]> wrote: >> > Hi everyone, >> > >> > I'm constantly running into OutOfMemoryErrors and for the life of me I >> > cannot figure out what's wrong. Let me describe my setup. I'm running >> > the >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an >> > unfinished implementation of TPC-H Q2 >> > >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java), >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory >> > per >> > machine. This is what I believe to be the relevant section of my >> > yarn_site.xml: >> > >> > >> > <property> >> > <name>yarn.nodemanager.resource.memory-mb</name> >> > <value>57344</value> >> > </property> >> > <!-- >> > <property> >> > <name>yarn.scheduler.minimum-allocation-mb</name> >> > <value>8192</value> >> > </property> >> > --> >> > <property> >> > <name>yarn.scheduler.maximum-allocation-mb</name> >> > <value>55296</value> >> > </property> >> > >> > <property> >> > <name>yarn.nodemanager.vmem-check-enabled</name> >> > <value>false</value> >> > </property> >> > >> > >> > And this is how I submit the job: >> > >> > >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 >> > ..... >> > >> > >> > The TMs happily report: >> > >> > ..... >> > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> > - JVM Options: >> > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> > - -Xms24511m >> > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> > - -Xmx24511m >> > 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> > - -XX:MaxDirectMemorySize=65m >> > ..... >> > >> > >> > I've tried various combinations of YARN and Flink options, to no avail. >> > I >> > always end up with the following stacktrace: >> > >> > >> > >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >> > java.lang.OutOfMemoryError: Direct buffer memory >> > at >> > >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> > at >> > >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> > at >> > >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >> > at >> > >> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) >> > at >> > >> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >> > at >> > >> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> > at >> > >> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> > at >> > >> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> > at >> > >> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) >> > at java.lang.Thread.run(Thread.java:745) >> > Caused by: io.netty.handler.codec.DecoderException: >> > java.lang.OutOfMemoryError: Direct buffer memory >> > at >> > >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) >> > at >> > >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >> > ... 9 more >> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory >> > at java.nio.Bits.reserveMemory(Bits.java:658) >> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) >> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) >> > at >> > >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) >> > at >> > >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157) >> > at >> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) >> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) >> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) >> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) >> > at >> > >> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) >> > at >> > >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) >> > ... 10 more >> > >> > >> > I always figured that running into OOMEs with Flink would be quite hard >> > to >> > achieve, however I'm wondering what's going wrong now. Seems to be >> > related >> > to the Direct Memory? Why are you limiting it in the JVM options at all? >> > Is >> > there a special place where I can safely increase the size / remove the >> > option altogether for unboundedness? >> > >> > A note on the data sizes, I used a scaling factor 1000 for the dbgen >> > command >> > of TPC-H, which effectively means the following. Each table is split in >> > 7 >> > chunks (one local to each TM), each chunk of the part.tbl is 734M, each >> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These >> > are >> > not excessive amounts of data, however the query (at least my >> > implementation) involves joins (the one in line 249 causing the OOME) >> > and >> > maybe there are some network issues? >> > >> > Maybe you can point me into the right direction, thanks a bunch. Cheers. >> > >> > Robert > > > > > -- > My GPG Key ID: 336E2680 |
Sweet! I'll pull it straight away. Thanks! On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <[hidden email]> wrote: Hi Robert, My GPG Key ID: 336E2680 |
By the way, you might have to use the "-U" flag to force Maven to
update its dependencies: mvn -U clean install -DskipTests On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke <[hidden email]> wrote: > Sweet! I'll pull it straight away. Thanks! > > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Robert, >> >> Just a quick update: The issue has been resolved in the latest Maven >> 0.10-SNAPSHOT dependency. >> >> Cheers, >> Max >> >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke >> <[hidden email]> wrote: >> > Hi Max, >> > >> > thanks for your quick reply. I found the relevant code and commented it >> > out >> > for testing, seems to be working. Happily waiting for the fix. Thanks >> > again. >> > >> > Robert >> > >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]> >> > wrote: >> >> >> >> Hi Robert, >> >> >> >> This is a regression on the current master due to changes in the way >> >> Flink calculates the memory and sets the maximum direct memory size. >> >> We introduced these changes when we merged support for off-heap >> >> memory. This is not a problem in the way Flink deals with managed >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the >> >> maximum direct memory is only used by the network stack. The network >> >> library we use, allocates more direct memory than we expected. >> >> >> >> We'll push a fix to the master as soon as possible. Thank you for >> >> reporting and thanks for your patience. >> >> >> >> Best regards, >> >> Max >> >> >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke >> >> <[hidden email]> wrote: >> >> > Hi everyone, >> >> > >> >> > I'm constantly running into OutOfMemoryErrors and for the life of me >> >> > I >> >> > cannot figure out what's wrong. Let me describe my setup. I'm running >> >> > the >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an >> >> > unfinished implementation of TPC-H Q2 >> >> > >> >> > >> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java), >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of >> >> > memory >> >> > per >> >> > machine. This is what I believe to be the relevant section of my >> >> > yarn_site.xml: >> >> > >> >> > >> >> > <property> >> >> > <name>yarn.nodemanager.resource.memory-mb</name> >> >> > <value>57344</value> >> >> > </property> >> >> > <!-- >> >> > <property> >> >> > <name>yarn.scheduler.minimum-allocation-mb</name> >> >> > <value>8192</value> >> >> > </property> >> >> > --> >> >> > <property> >> >> > <name>yarn.scheduler.maximum-allocation-mb</name> >> >> > <value>55296</value> >> >> > </property> >> >> > >> >> > <property> >> >> > <name>yarn.nodemanager.vmem-check-enabled</name> >> >> > <value>false</value> >> >> > </property> >> >> > >> >> > >> >> > And this is how I submit the job: >> >> > >> >> > >> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 >> >> > ..... >> >> > >> >> > >> >> > The TMs happily report: >> >> > >> >> > ..... >> >> > 11:50:15,577 INFO >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> > - JVM Options: >> >> > 11:50:15,577 INFO >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> > - -Xms24511m >> >> > 11:50:15,577 INFO >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> > - -Xmx24511m >> >> > 11:50:15,577 INFO >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> > - -XX:MaxDirectMemorySize=65m >> >> > ..... >> >> > >> >> > >> >> > I've tried various combinations of YARN and Flink options, to no >> >> > avail. >> >> > I >> >> > always end up with the following stacktrace: >> >> > >> >> > >> >> > >> >> > >> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >> >> > java.lang.OutOfMemoryError: Direct buffer memory >> >> > at >> >> > >> >> > >> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> >> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> >> > at >> >> > >> >> > >> >> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) >> >> > at java.lang.Thread.run(Thread.java:745) >> >> > Caused by: io.netty.handler.codec.DecoderException: >> >> > java.lang.OutOfMemoryError: Direct buffer memory >> >> > at >> >> > >> >> > >> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) >> >> > at >> >> > >> >> > >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >> >> > ... 9 more >> >> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory >> >> > at java.nio.Bits.reserveMemory(Bits.java:658) >> >> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) >> >> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) >> >> > at >> >> > >> >> > >> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) >> >> > at >> >> > >> >> > >> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157) >> >> > at >> >> > >> >> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) >> >> > at >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) >> >> > at >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) >> >> > at >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) >> >> > at >> >> > >> >> > >> >> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) >> >> > at >> >> > >> >> > >> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) >> >> > ... 10 more >> >> > >> >> > >> >> > I always figured that running into OOMEs with Flink would be quite >> >> > hard >> >> > to >> >> > achieve, however I'm wondering what's going wrong now. Seems to be >> >> > related >> >> > to the Direct Memory? Why are you limiting it in the JVM options at >> >> > all? >> >> > Is >> >> > there a special place where I can safely increase the size / remove >> >> > the >> >> > option altogether for unboundedness? >> >> > >> >> > A note on the data sizes, I used a scaling factor 1000 for the dbgen >> >> > command >> >> > of TPC-H, which effectively means the following. Each table is split >> >> > in >> >> > 7 >> >> > chunks (one local to each TM), each chunk of the part.tbl is 734M, >> >> > each >> >> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. >> >> > These >> >> > are >> >> > not excessive amounts of data, however the query (at least my >> >> > implementation) involves joins (the one in line 249 causing the OOME) >> >> > and >> >> > maybe there are some network issues? >> >> > >> >> > Maybe you can point me into the right direction, thanks a bunch. >> >> > Cheers. >> >> > >> >> > Robert >> > >> > >> > >> > >> > -- >> > My GPG Key ID: 336E2680 > > > > > -- > My GPG Key ID: 336E2680 |
I pulled the current master branch and rebuilt Flink completely anyway. Works like a charm. On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels <[hidden email]> wrote: By the way, you might have to use the "-U" flag to force Maven to My GPG Key ID: 336E2680 |
Great to hear :)
On Thu, Oct 1, 2015 at 11:21 AM, Robert Schmidtke <[hidden email]> wrote: > I pulled the current master branch and rebuilt Flink completely anyway. > Works like a charm. > > On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels <[hidden email]> wrote: >> >> By the way, you might have to use the "-U" flag to force Maven to >> update its dependencies: mvn -U clean install -DskipTests >> >> On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke >> <[hidden email]> wrote: >> > Sweet! I'll pull it straight away. Thanks! >> > >> > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <[hidden email]> >> > wrote: >> >> >> >> Hi Robert, >> >> >> >> Just a quick update: The issue has been resolved in the latest Maven >> >> 0.10-SNAPSHOT dependency. >> >> >> >> Cheers, >> >> Max >> >> >> >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke >> >> <[hidden email]> wrote: >> >> > Hi Max, >> >> > >> >> > thanks for your quick reply. I found the relevant code and commented >> >> > it >> >> > out >> >> > for testing, seems to be working. Happily waiting for the fix. Thanks >> >> > again. >> >> > >> >> > Robert >> >> > >> >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]> >> >> > wrote: >> >> >> >> >> >> Hi Robert, >> >> >> >> >> >> This is a regression on the current master due to changes in the way >> >> >> Flink calculates the memory and sets the maximum direct memory size. >> >> >> We introduced these changes when we merged support for off-heap >> >> >> memory. This is not a problem in the way Flink deals with managed >> >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the >> >> >> maximum direct memory is only used by the network stack. The network >> >> >> library we use, allocates more direct memory than we expected. >> >> >> >> >> >> We'll push a fix to the master as soon as possible. Thank you for >> >> >> reporting and thanks for your patience. >> >> >> >> >> >> Best regards, >> >> >> Max >> >> >> >> >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke >> >> >> <[hidden email]> wrote: >> >> >> > Hi everyone, >> >> >> > >> >> >> > I'm constantly running into OutOfMemoryErrors and for the life of >> >> >> > me >> >> >> > I >> >> >> > cannot figure out what's wrong. Let me describe my setup. I'm >> >> >> > running >> >> >> > the >> >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is >> >> >> > an >> >> >> > unfinished implementation of TPC-H Q2 >> >> >> > >> >> >> > >> >> >> > >> >> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java), >> >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of >> >> >> > memory >> >> >> > per >> >> >> > machine. This is what I believe to be the relevant section of my >> >> >> > yarn_site.xml: >> >> >> > >> >> >> > >> >> >> > <property> >> >> >> > <name>yarn.nodemanager.resource.memory-mb</name> >> >> >> > <value>57344</value> >> >> >> > </property> >> >> >> > <!-- >> >> >> > <property> >> >> >> > <name>yarn.scheduler.minimum-allocation-mb</name> >> >> >> > <value>8192</value> >> >> >> > </property> >> >> >> > --> >> >> >> > <property> >> >> >> > <name>yarn.scheduler.maximum-allocation-mb</name> >> >> >> > <value>55296</value> >> >> >> > </property> >> >> >> > >> >> >> > <property> >> >> >> > <name>yarn.nodemanager.vmem-check-enabled</name> >> >> >> > <value>false</value> >> >> >> > </property> >> >> >> > >> >> >> > >> >> >> > And this is how I submit the job: >> >> >> > >> >> >> > >> >> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 >> >> >> > -yn 7 >> >> >> > ..... >> >> >> > >> >> >> > >> >> >> > The TMs happily report: >> >> >> > >> >> >> > ..... >> >> >> > 11:50:15,577 INFO >> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> >> > - JVM Options: >> >> >> > 11:50:15,577 INFO >> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> >> > - -Xms24511m >> >> >> > 11:50:15,577 INFO >> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> >> > - -Xmx24511m >> >> >> > 11:50:15,577 INFO >> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner >> >> >> > - -XX:MaxDirectMemorySize=65m >> >> >> > ..... >> >> >> > >> >> >> > >> >> >> > I've tried various combinations of YARN and Flink options, to no >> >> >> > avail. >> >> >> > I >> >> >> > always end up with the following stacktrace: >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >> >> >> > java.lang.OutOfMemoryError: Direct buffer memory >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> >> >> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) >> >> >> > at java.lang.Thread.run(Thread.java:745) >> >> >> > Caused by: io.netty.handler.codec.DecoderException: >> >> >> > java.lang.OutOfMemoryError: Direct buffer memory >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >> >> >> > ... 9 more >> >> >> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory >> >> >> > at java.nio.Bits.reserveMemory(Bits.java:658) >> >> >> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) >> >> >> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) >> >> >> > at >> >> >> > >> >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) >> >> >> > at >> >> >> > >> >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) >> >> >> > at >> >> >> > >> >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) >> >> >> > ... 10 more >> >> >> > >> >> >> > >> >> >> > I always figured that running into OOMEs with Flink would be quite >> >> >> > hard >> >> >> > to >> >> >> > achieve, however I'm wondering what's going wrong now. Seems to be >> >> >> > related >> >> >> > to the Direct Memory? Why are you limiting it in the JVM options >> >> >> > at >> >> >> > all? >> >> >> > Is >> >> >> > there a special place where I can safely increase the size / >> >> >> > remove >> >> >> > the >> >> >> > option altogether for unboundedness? >> >> >> > >> >> >> > A note on the data sizes, I used a scaling factor 1000 for the >> >> >> > dbgen >> >> >> > command >> >> >> > of TPC-H, which effectively means the following. Each table is >> >> >> > split >> >> >> > in >> >> >> > 7 >> >> >> > chunks (one local to each TM), each chunk of the part.tbl is 734M, >> >> >> > each >> >> >> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. >> >> >> > These >> >> >> > are >> >> >> > not excessive amounts of data, however the query (at least my >> >> >> > implementation) involves joins (the one in line 249 causing the >> >> >> > OOME) >> >> >> > and >> >> >> > maybe there are some network issues? >> >> >> > >> >> >> > Maybe you can point me into the right direction, thanks a bunch. >> >> >> > Cheers. >> >> >> > >> >> >> > Robert >> >> > >> >> > >> >> > >> >> > >> >> > -- >> >> > My GPG Key ID: 336E2680 >> > >> > >> > >> > >> > -- >> > My GPG Key ID: 336E2680 > > > > > -- > My GPG Key ID: 336E2680 |
Free forum by Nabble | Edit this page |