OutOfMemoryError in netty local transport

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

OutOfMemoryError in netty local transport

Robert Schmidtke
This post was updated on .
Hi everyone,

I'm constantly running into OutOfMemoryErrors and for the life of me I
cannot figure out what's wrong. Let me describe my setup. I'm running the
current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
unfinished implementation of TPC-H Q2 (
https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per
machine. This is what I believe to be the relevant section of my
yarn_site.xml:


<property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>57344</value>
  </property>

  <property>
    <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>55296</value>
  </property>

  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
  </property>


And this is how I submit the job:


$FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 .....


The TMs happily report:

.....
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -  JVM Options:
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -     -Xms24511m
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -     -Xmx24511m
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -     -XX:MaxDirectMemorySize=65m
.....


I've tried various combinations of YARN and Flink options, to no avail. I
always end up with the following stacktrace:


org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
java.lang.OutOfMemoryError: Direct buffer memory
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
at
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
at
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.handler.codec.DecoderException:
java.lang.OutOfMemoryError: Direct buffer memory
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at
io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
at
io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
at
io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
... 10 more


I always figured that running into OOMEs with Flink would be quite hard to
achieve, however I'm wondering what's going wrong now. Seems to be related
to the Direct Memory? Why are you limiting it in the JVM options at all? Is
there a special place where I can safely increase the size / remove the
option altogether for unboundedness?

A note on the data sizes, I used a scaling factor 1000 for the dbgen
command of TPC-H, which effectively means the following. Each table is
split in 7 chunks (one local to each TM), each chunk of the part.tbl is
734M, each chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is
3.6G. These are not excessive amounts of data, however the query (at least
my implementation) involves joins (the one in line 249 causing the OOME)
and maybe there are some network issues?

Maybe you can point me into the right direction, thanks a bunch. Cheers.

Robert
Reply | Threaded
Open this post in threaded view
|

Re: OutOfMemoryError in netty local transport

Maximilian Michels
Hi Robert,

This is a regression on the current master due to changes in the way
Flink calculates the memory and sets the maximum direct memory size.
We introduced these changes when we merged support for off-heap
memory. This is not a problem in the way Flink deals with managed
memory, just -XX:MaxDirectMemorySize is set too low. By default the
maximum direct memory is only used by the network stack. The network
library we use, allocates more direct memory than we expected.

We'll push a fix to the master as soon as possible. Thank you for
reporting and thanks for your patience.

Best regards,
Max

On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
<[hidden email]> wrote:

> Hi everyone,
>
> I'm constantly running into OutOfMemoryErrors and for the life of me I
> cannot figure out what's wrong. Let me describe my setup. I'm running the
> current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
> unfinished implementation of TPC-H Q2
> (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
> I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per
> machine. This is what I believe to be the relevant section of my
> yarn_site.xml:
>
>
> <property>
>     <name>yarn.nodemanager.resource.memory-mb</name>
>     <value>57344</value>
>   </property>
> <!--
>   <property>
>     <name>yarn.scheduler.minimum-allocation-mb</name>
>     <value>8192</value>
>   </property>
> -->
>   <property>
>     <name>yarn.scheduler.maximum-allocation-mb</name>
>     <value>55296</value>
>   </property>
>
>   <property>
>     <name>yarn.nodemanager.vmem-check-enabled</name>
>     <value>false</value>
>   </property>
>
>
> And this is how I submit the job:
>
>
> $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 .....
>
>
> The TMs happily report:
>
> .....
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -  JVM Options:
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -Xms24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -Xmx24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -XX:MaxDirectMemorySize=65m
> .....
>
>
> I've tried various combinations of YARN and Flink options, to no avail. I
> always end up with the following stacktrace:
>
>
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> ... 9 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
> at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
> at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
> ... 10 more
>
>
> I always figured that running into OOMEs with Flink would be quite hard to
> achieve, however I'm wondering what's going wrong now. Seems to be related
> to the Direct Memory? Why are you limiting it in the JVM options at all? Is
> there a special place where I can safely increase the size / remove the
> option altogether for unboundedness?
>
> A note on the data sizes, I used a scaling factor 1000 for the dbgen command
> of TPC-H, which effectively means the following. Each table is split in 7
> chunks (one local to each TM), each chunk of the part.tbl is 734M, each
> chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These are
> not excessive amounts of data, however the query (at least my
> implementation) involves joins (the one in line 249 causing the OOME) and
> maybe there are some network issues?
>
> Maybe you can point me into the right direction, thanks a bunch. Cheers.
>
> Robert
Reply | Threaded
Open this post in threaded view
|

Re: OutOfMemoryError in netty local transport

Robert Schmidtke
In reply to this post by Robert Schmidtke
Hi Max,

thanks for your quick reply. I found the relevant code and commented it out for testing, seems to be working. Happily waiting for the fix. Thanks again.

Robert

On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]> wrote:
Hi Robert,

This is a regression on the current master due to changes in the way
Flink calculates the memory and sets the maximum direct memory size.
We introduced these changes when we merged support for off-heap
memory. This is not a problem in the way Flink deals with managed
memory, just -XX:MaxDirectMemorySize is set too low. By default the
maximum direct memory is only used by the network stack. The network
library we use, allocates more direct memory than we expected.

We'll push a fix to the master as soon as possible. Thank you for
reporting and thanks for your patience.

Best regards,
Max

On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
<[hidden email]> wrote:
> Hi everyone,
>
> I'm constantly running into OutOfMemoryErrors and for the life of me I
> cannot figure out what's wrong. Let me describe my setup. I'm running the
> current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
> unfinished implementation of TPC-H Q2
> (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
> I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per
> machine. This is what I believe to be the relevant section of my
> yarn_site.xml:
>
>
> <property>
>     <name>yarn.nodemanager.resource.memory-mb</name>
>     <value>57344</value>
>   </property>
> <!--
>   <property>
>     <name>yarn.scheduler.minimum-allocation-mb</name>
>     <value>8192</value>
>   </property>
> -->
>   <property>
>     <name>yarn.scheduler.maximum-allocation-mb</name>
>     <value>55296</value>
>   </property>
>
>   <property>
>     <name>yarn.nodemanager.vmem-check-enabled</name>
>     <value>false</value>
>   </property>
>
>
> And this is how I submit the job:
>
>
> $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 .....
>
>
> The TMs happily report:
>
> .....
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -  JVM Options:
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -Xms24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -Xmx24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -XX:MaxDirectMemorySize=65m
> .....
>
>
> I've tried various combinations of YARN and Flink options, to no avail. I
> always end up with the following stacktrace:
>
>
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> ... 9 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
> at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
> at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
> ... 10 more
>
>
> I always figured that running into OOMEs with Flink would be quite hard to
> achieve, however I'm wondering what's going wrong now. Seems to be related
> to the Direct Memory? Why are you limiting it in the JVM options at all? Is
> there a special place where I can safely increase the size / remove the
> option altogether for unboundedness?
>
> A note on the data sizes, I used a scaling factor 1000 for the dbgen command
> of TPC-H, which effectively means the following. Each table is split in 7
> chunks (one local to each TM), each chunk of the part.tbl is 734M, each
> chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These are
> not excessive amounts of data, however the query (at least my
> implementation) involves joins (the one in line 249 causing the OOME) and
> maybe there are some network issues?
>
> Maybe you can point me into the right direction, thanks a bunch. Cheers.
>
> Robert



--
My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: OutOfMemoryError in netty local transport

Maximilian Michels
Hi Robert,

Just a quick update: The issue has been resolved in the latest Maven
0.10-SNAPSHOT dependency.

Cheers,
Max

On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
<[hidden email]> wrote:

> Hi Max,
>
> thanks for your quick reply. I found the relevant code and commented it out
> for testing, seems to be working. Happily waiting for the fix. Thanks again.
>
> Robert
>
> On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Robert,
>>
>> This is a regression on the current master due to changes in the way
>> Flink calculates the memory and sets the maximum direct memory size.
>> We introduced these changes when we merged support for off-heap
>> memory. This is not a problem in the way Flink deals with managed
>> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> maximum direct memory is only used by the network stack. The network
>> library we use, allocates more direct memory than we expected.
>>
>> We'll push a fix to the master as soon as possible. Thank you for
>> reporting and thanks for your patience.
>>
>> Best regards,
>> Max
>>
>> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> <[hidden email]> wrote:
>> > Hi everyone,
>> >
>> > I'm constantly running into OutOfMemoryErrors and for the life of me I
>> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> > the
>> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> > unfinished implementation of TPC-H Q2
>> >
>> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory
>> > per
>> > machine. This is what I believe to be the relevant section of my
>> > yarn_site.xml:
>> >
>> >
>> > <property>
>> >     <name>yarn.nodemanager.resource.memory-mb</name>
>> >     <value>57344</value>
>> >   </property>
>> > <!--
>> >   <property>
>> >     <name>yarn.scheduler.minimum-allocation-mb</name>
>> >     <value>8192</value>
>> >   </property>
>> > -->
>> >   <property>
>> >     <name>yarn.scheduler.maximum-allocation-mb</name>
>> >     <value>55296</value>
>> >   </property>
>> >
>> >   <property>
>> >     <name>yarn.nodemanager.vmem-check-enabled</name>
>> >     <value>false</value>
>> >   </property>
>> >
>> >
>> > And this is how I submit the job:
>> >
>> >
>> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> > .....
>> >
>> >
>> > The TMs happily report:
>> >
>> > .....
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -  JVM Options:
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -     -Xms24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -     -Xmx24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -     -XX:MaxDirectMemorySize=65m
>> > .....
>> >
>> >
>> > I've tried various combinations of YARN and Flink options, to no avail.
>> > I
>> > always end up with the following stacktrace:
>> >
>> >
>> >
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> > at
>> >
>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> > at
>> >
>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> > at
>> >
>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> > at java.lang.Thread.run(Thread.java:745)
>> > Caused by: io.netty.handler.codec.DecoderException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> > ... 9 more
>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> > at java.nio.Bits.reserveMemory(Bits.java:658)
>> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> > at
>> >
>> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
>> > at
>> >
>> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
>> > at
>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> > at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> > at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> > ... 10 more
>> >
>> >
>> > I always figured that running into OOMEs with Flink would be quite hard
>> > to
>> > achieve, however I'm wondering what's going wrong now. Seems to be
>> > related
>> > to the Direct Memory? Why are you limiting it in the JVM options at all?
>> > Is
>> > there a special place where I can safely increase the size / remove the
>> > option altogether for unboundedness?
>> >
>> > A note on the data sizes, I used a scaling factor 1000 for the dbgen
>> > command
>> > of TPC-H, which effectively means the following. Each table is split in
>> > 7
>> > chunks (one local to each TM), each chunk of the part.tbl is 734M, each
>> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These
>> > are
>> > not excessive amounts of data, however the query (at least my
>> > implementation) involves joins (the one in line 249 causing the OOME)
>> > and
>> > maybe there are some network issues?
>> >
>> > Maybe you can point me into the right direction, thanks a bunch. Cheers.
>> >
>> > Robert
>
>
>
>
> --
> My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: OutOfMemoryError in netty local transport

Robert Schmidtke
Sweet! I'll pull it straight away. Thanks!

On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <[hidden email]> wrote:
Hi Robert,

Just a quick update: The issue has been resolved in the latest Maven
0.10-SNAPSHOT dependency.

Cheers,
Max

On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
<[hidden email]> wrote:
> Hi Max,
>
> thanks for your quick reply. I found the relevant code and commented it out
> for testing, seems to be working. Happily waiting for the fix. Thanks again.
>
> Robert
>
> On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Robert,
>>
>> This is a regression on the current master due to changes in the way
>> Flink calculates the memory and sets the maximum direct memory size.
>> We introduced these changes when we merged support for off-heap
>> memory. This is not a problem in the way Flink deals with managed
>> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> maximum direct memory is only used by the network stack. The network
>> library we use, allocates more direct memory than we expected.
>>
>> We'll push a fix to the master as soon as possible. Thank you for
>> reporting and thanks for your patience.
>>
>> Best regards,
>> Max
>>
>> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> <[hidden email]> wrote:
>> > Hi everyone,
>> >
>> > I'm constantly running into OutOfMemoryErrors and for the life of me I
>> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> > the
>> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> > unfinished implementation of TPC-H Q2
>> >
>> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory
>> > per
>> > machine. This is what I believe to be the relevant section of my
>> > yarn_site.xml:
>> >
>> >
>> > <property>
>> >     <name>yarn.nodemanager.resource.memory-mb</name>
>> >     <value>57344</value>
>> >   </property>
>> > <!--
>> >   <property>
>> >     <name>yarn.scheduler.minimum-allocation-mb</name>
>> >     <value>8192</value>
>> >   </property>
>> > -->
>> >   <property>
>> >     <name>yarn.scheduler.maximum-allocation-mb</name>
>> >     <value>55296</value>
>> >   </property>
>> >
>> >   <property>
>> >     <name>yarn.nodemanager.vmem-check-enabled</name>
>> >     <value>false</value>
>> >   </property>
>> >
>> >
>> > And this is how I submit the job:
>> >
>> >
>> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> > .....
>> >
>> >
>> > The TMs happily report:
>> >
>> > .....
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -  JVM Options:
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -     -Xms24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -     -Xmx24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -     -XX:MaxDirectMemorySize=65m
>> > .....
>> >
>> >
>> > I've tried various combinations of YARN and Flink options, to no avail.
>> > I
>> > always end up with the following stacktrace:
>> >
>> >
>> >
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> > at
>> >
>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> > at
>> >
>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> > at
>> >
>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> > at java.lang.Thread.run(Thread.java:745)
>> > Caused by: io.netty.handler.codec.DecoderException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> > ... 9 more
>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> > at java.nio.Bits.reserveMemory(Bits.java:658)
>> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> > at
>> >
>> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
>> > at
>> >
>> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
>> > at
>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> > at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> > at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> > ... 10 more
>> >
>> >
>> > I always figured that running into OOMEs with Flink would be quite hard
>> > to
>> > achieve, however I'm wondering what's going wrong now. Seems to be
>> > related
>> > to the Direct Memory? Why are you limiting it in the JVM options at all?
>> > Is
>> > there a special place where I can safely increase the size / remove the
>> > option altogether for unboundedness?
>> >
>> > A note on the data sizes, I used a scaling factor 1000 for the dbgen
>> > command
>> > of TPC-H, which effectively means the following. Each table is split in
>> > 7
>> > chunks (one local to each TM), each chunk of the part.tbl is 734M, each
>> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These
>> > are
>> > not excessive amounts of data, however the query (at least my
>> > implementation) involves joins (the one in line 249 causing the OOME)
>> > and
>> > maybe there are some network issues?
>> >
>> > Maybe you can point me into the right direction, thanks a bunch. Cheers.
>> >
>> > Robert
>
>
>
>
> --
> My GPG Key ID: 336E2680



--
My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: OutOfMemoryError in netty local transport

Maximilian Michels
By the way, you might have to use the "-U" flag to force Maven to
update its dependencies:  mvn -U clean install -DskipTests

On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
<[hidden email]> wrote:

> Sweet! I'll pull it straight away. Thanks!
>
> On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Robert,
>>
>> Just a quick update: The issue has been resolved in the latest Maven
>> 0.10-SNAPSHOT dependency.
>>
>> Cheers,
>> Max
>>
>> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>> <[hidden email]> wrote:
>> > Hi Max,
>> >
>> > thanks for your quick reply. I found the relevant code and commented it
>> > out
>> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> > again.
>> >
>> > Robert
>> >
>> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]>
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> This is a regression on the current master due to changes in the way
>> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> We introduced these changes when we merged support for off-heap
>> >> memory. This is not a problem in the way Flink deals with managed
>> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> maximum direct memory is only used by the network stack. The network
>> >> library we use, allocates more direct memory than we expected.
>> >>
>> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> reporting and thanks for your patience.
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >> <[hidden email]> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I'm constantly running into OutOfMemoryErrors and for the life of me
>> >> > I
>> >> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> >> > the
>> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> >> > unfinished implementation of TPC-H Q2
>> >> >
>> >> >
>> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> > memory
>> >> > per
>> >> > machine. This is what I believe to be the relevant section of my
>> >> > yarn_site.xml:
>> >> >
>> >> >
>> >> > <property>
>> >> >     <name>yarn.nodemanager.resource.memory-mb</name>
>> >> >     <value>57344</value>
>> >> >   </property>
>> >> > <!--
>> >> >   <property>
>> >> >     <name>yarn.scheduler.minimum-allocation-mb</name>
>> >> >     <value>8192</value>
>> >> >   </property>
>> >> > -->
>> >> >   <property>
>> >> >     <name>yarn.scheduler.maximum-allocation-mb</name>
>> >> >     <value>55296</value>
>> >> >   </property>
>> >> >
>> >> >   <property>
>> >> >     <name>yarn.nodemanager.vmem-check-enabled</name>
>> >> >     <value>false</value>
>> >> >   </property>
>> >> >
>> >> >
>> >> > And this is how I submit the job:
>> >> >
>> >> >
>> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> >> > .....
>> >> >
>> >> >
>> >> > The TMs happily report:
>> >> >
>> >> > .....
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -  JVM Options:
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -     -Xms24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -     -Xmx24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -     -XX:MaxDirectMemorySize=65m
>> >> > .....
>> >> >
>> >> >
>> >> > I've tried various combinations of YARN and Flink options, to no
>> >> > avail.
>> >> > I
>> >> > always end up with the following stacktrace:
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >> > at java.lang.Thread.run(Thread.java:745)
>> >> > Caused by: io.netty.handler.codec.DecoderException:
>> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >> > ... 9 more
>> >> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at java.nio.Bits.reserveMemory(Bits.java:658)
>> >> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
>> >> > at
>> >> >
>> >> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >> > at
>> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >> > at
>> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >> > at
>> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >> > ... 10 more
>> >> >
>> >> >
>> >> > I always figured that running into OOMEs with Flink would be quite
>> >> > hard
>> >> > to
>> >> > achieve, however I'm wondering what's going wrong now. Seems to be
>> >> > related
>> >> > to the Direct Memory? Why are you limiting it in the JVM options at
>> >> > all?
>> >> > Is
>> >> > there a special place where I can safely increase the size / remove
>> >> > the
>> >> > option altogether for unboundedness?
>> >> >
>> >> > A note on the data sizes, I used a scaling factor 1000 for the dbgen
>> >> > command
>> >> > of TPC-H, which effectively means the following. Each table is split
>> >> > in
>> >> > 7
>> >> > chunks (one local to each TM), each chunk of the part.tbl is 734M,
>> >> > each
>> >> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G.
>> >> > These
>> >> > are
>> >> > not excessive amounts of data, however the query (at least my
>> >> > implementation) involves joins (the one in line 249 causing the OOME)
>> >> > and
>> >> > maybe there are some network issues?
>> >> >
>> >> > Maybe you can point me into the right direction, thanks a bunch.
>> >> > Cheers.
>> >> >
>> >> > Robert
>> >
>> >
>> >
>> >
>> > --
>> > My GPG Key ID: 336E2680
>
>
>
>
> --
> My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: OutOfMemoryError in netty local transport

Robert Schmidtke
I pulled the current master branch and rebuilt Flink completely anyway. Works like a charm.

On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels <[hidden email]> wrote:
By the way, you might have to use the "-U" flag to force Maven to
update its dependencies:  mvn -U clean install -DskipTests

On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
<[hidden email]> wrote:
> Sweet! I'll pull it straight away. Thanks!
>
> On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Robert,
>>
>> Just a quick update: The issue has been resolved in the latest Maven
>> 0.10-SNAPSHOT dependency.
>>
>> Cheers,
>> Max
>>
>> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>> <[hidden email]> wrote:
>> > Hi Max,
>> >
>> > thanks for your quick reply. I found the relevant code and commented it
>> > out
>> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> > again.
>> >
>> > Robert
>> >
>> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]>
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> This is a regression on the current master due to changes in the way
>> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> We introduced these changes when we merged support for off-heap
>> >> memory. This is not a problem in the way Flink deals with managed
>> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> maximum direct memory is only used by the network stack. The network
>> >> library we use, allocates more direct memory than we expected.
>> >>
>> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> reporting and thanks for your patience.
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >> <[hidden email]> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I'm constantly running into OutOfMemoryErrors and for the life of me
>> >> > I
>> >> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> >> > the
>> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> >> > unfinished implementation of TPC-H Q2
>> >> >
>> >> >
>> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> > memory
>> >> > per
>> >> > machine. This is what I believe to be the relevant section of my
>> >> > yarn_site.xml:
>> >> >
>> >> >
>> >> > <property>
>> >> >     <name>yarn.nodemanager.resource.memory-mb</name>
>> >> >     <value>57344</value>
>> >> >   </property>
>> >> > <!--
>> >> >   <property>
>> >> >     <name>yarn.scheduler.minimum-allocation-mb</name>
>> >> >     <value>8192</value>
>> >> >   </property>
>> >> > -->
>> >> >   <property>
>> >> >     <name>yarn.scheduler.maximum-allocation-mb</name>
>> >> >     <value>55296</value>
>> >> >   </property>
>> >> >
>> >> >   <property>
>> >> >     <name>yarn.nodemanager.vmem-check-enabled</name>
>> >> >     <value>false</value>
>> >> >   </property>
>> >> >
>> >> >
>> >> > And this is how I submit the job:
>> >> >
>> >> >
>> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> >> > .....
>> >> >
>> >> >
>> >> > The TMs happily report:
>> >> >
>> >> > .....
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -  JVM Options:
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -     -Xms24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -     -Xmx24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -     -XX:MaxDirectMemorySize=65m
>> >> > .....
>> >> >
>> >> >
>> >> > I've tried various combinations of YARN and Flink options, to no
>> >> > avail.
>> >> > I
>> >> > always end up with the following stacktrace:
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >> > at java.lang.Thread.run(Thread.java:745)
>> >> > Caused by: io.netty.handler.codec.DecoderException:
>> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >> > ... 9 more
>> >> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at java.nio.Bits.reserveMemory(Bits.java:658)
>> >> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
>> >> > at
>> >> >
>> >> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >> > at
>> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >> > at
>> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >> > at
>> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >> > ... 10 more
>> >> >
>> >> >
>> >> > I always figured that running into OOMEs with Flink would be quite
>> >> > hard
>> >> > to
>> >> > achieve, however I'm wondering what's going wrong now. Seems to be
>> >> > related
>> >> > to the Direct Memory? Why are you limiting it in the JVM options at
>> >> > all?
>> >> > Is
>> >> > there a special place where I can safely increase the size / remove
>> >> > the
>> >> > option altogether for unboundedness?
>> >> >
>> >> > A note on the data sizes, I used a scaling factor 1000 for the dbgen
>> >> > command
>> >> > of TPC-H, which effectively means the following. Each table is split
>> >> > in
>> >> > 7
>> >> > chunks (one local to each TM), each chunk of the part.tbl is 734M,
>> >> > each
>> >> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G.
>> >> > These
>> >> > are
>> >> > not excessive amounts of data, however the query (at least my
>> >> > implementation) involves joins (the one in line 249 causing the OOME)
>> >> > and
>> >> > maybe there are some network issues?
>> >> >
>> >> > Maybe you can point me into the right direction, thanks a bunch.
>> >> > Cheers.
>> >> >
>> >> > Robert
>> >
>> >
>> >
>> >
>> > --
>> > My GPG Key ID: 336E2680
>
>
>
>
> --
> My GPG Key ID: 336E2680



--
My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: OutOfMemoryError in netty local transport

Maximilian Michels
Great to hear :)

On Thu, Oct 1, 2015 at 11:21 AM, Robert Schmidtke
<[hidden email]> wrote:

> I pulled the current master branch and rebuilt Flink completely anyway.
> Works like a charm.
>
> On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> By the way, you might have to use the "-U" flag to force Maven to
>> update its dependencies:  mvn -U clean install -DskipTests
>>
>> On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
>> <[hidden email]> wrote:
>> > Sweet! I'll pull it straight away. Thanks!
>> >
>> > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <[hidden email]>
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> Just a quick update: The issue has been resolved in the latest Maven
>> >> 0.10-SNAPSHOT dependency.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>> >> <[hidden email]> wrote:
>> >> > Hi Max,
>> >> >
>> >> > thanks for your quick reply. I found the relevant code and commented
>> >> > it
>> >> > out
>> >> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> >> > again.
>> >> >
>> >> > Robert
>> >> >
>> >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <[hidden email]>
>> >> > wrote:
>> >> >>
>> >> >> Hi Robert,
>> >> >>
>> >> >> This is a regression on the current master due to changes in the way
>> >> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> >> We introduced these changes when we merged support for off-heap
>> >> >> memory. This is not a problem in the way Flink deals with managed
>> >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> >> maximum direct memory is only used by the network stack. The network
>> >> >> library we use, allocates more direct memory than we expected.
>> >> >>
>> >> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> >> reporting and thanks for your patience.
>> >> >>
>> >> >> Best regards,
>> >> >> Max
>> >> >>
>> >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >> >> <[hidden email]> wrote:
>> >> >> > Hi everyone,
>> >> >> >
>> >> >> > I'm constantly running into OutOfMemoryErrors and for the life of
>> >> >> > me
>> >> >> > I
>> >> >> > cannot figure out what's wrong. Let me describe my setup. I'm
>> >> >> > running
>> >> >> > the
>> >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is
>> >> >> > an
>> >> >> > unfinished implementation of TPC-H Q2
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> >> > memory
>> >> >> > per
>> >> >> > machine. This is what I believe to be the relevant section of my
>> >> >> > yarn_site.xml:
>> >> >> >
>> >> >> >
>> >> >> > <property>
>> >> >> >     <name>yarn.nodemanager.resource.memory-mb</name>
>> >> >> >     <value>57344</value>
>> >> >> >   </property>
>> >> >> > <!--
>> >> >> >   <property>
>> >> >> >     <name>yarn.scheduler.minimum-allocation-mb</name>
>> >> >> >     <value>8192</value>
>> >> >> >   </property>
>> >> >> > -->
>> >> >> >   <property>
>> >> >> >     <name>yarn.scheduler.maximum-allocation-mb</name>
>> >> >> >     <value>55296</value>
>> >> >> >   </property>
>> >> >> >
>> >> >> >   <property>
>> >> >> >     <name>yarn.nodemanager.vmem-check-enabled</name>
>> >> >> >     <value>false</value>
>> >> >> >   </property>
>> >> >> >
>> >> >> >
>> >> >> > And this is how I submit the job:
>> >> >> >
>> >> >> >
>> >> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768
>> >> >> > -yn 7
>> >> >> > .....
>> >> >> >
>> >> >> >
>> >> >> > The TMs happily report:
>> >> >> >
>> >> >> > .....
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > -  JVM Options:
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > -     -Xms24511m
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > -     -Xmx24511m
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > -     -XX:MaxDirectMemorySize=65m
>> >> >> > .....
>> >> >> >
>> >> >> >
>> >> >> > I've tried various combinations of YARN and Flink options, to no
>> >> >> > avail.
>> >> >> > I
>> >> >> > always end up with the following stacktrace:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >> >> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >> >> > at java.lang.Thread.run(Thread.java:745)
>> >> >> > Caused by: io.netty.handler.codec.DecoderException:
>> >> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >> >> > ... 9 more
>> >> >> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >> >> > at java.nio.Bits.reserveMemory(Bits.java:658)
>> >> >> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >> >> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >> >> > at
>> >> >> >
>> >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >> >> > at
>> >> >> >
>> >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >> >> > at
>> >> >> >
>> >> >> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >> >> > ... 10 more
>> >> >> >
>> >> >> >
>> >> >> > I always figured that running into OOMEs with Flink would be quite
>> >> >> > hard
>> >> >> > to
>> >> >> > achieve, however I'm wondering what's going wrong now. Seems to be
>> >> >> > related
>> >> >> > to the Direct Memory? Why are you limiting it in the JVM options
>> >> >> > at
>> >> >> > all?
>> >> >> > Is
>> >> >> > there a special place where I can safely increase the size /
>> >> >> > remove
>> >> >> > the
>> >> >> > option altogether for unboundedness?
>> >> >> >
>> >> >> > A note on the data sizes, I used a scaling factor 1000 for the
>> >> >> > dbgen
>> >> >> > command
>> >> >> > of TPC-H, which effectively means the following. Each table is
>> >> >> > split
>> >> >> > in
>> >> >> > 7
>> >> >> > chunks (one local to each TM), each chunk of the part.tbl is 734M,
>> >> >> > each
>> >> >> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G.
>> >> >> > These
>> >> >> > are
>> >> >> > not excessive amounts of data, however the query (at least my
>> >> >> > implementation) involves joins (the one in line 249 causing the
>> >> >> > OOME)
>> >> >> > and
>> >> >> > maybe there are some network issues?
>> >> >> >
>> >> >> > Maybe you can point me into the right direction, thanks a bunch.
>> >> >> > Cheers.
>> >> >> >
>> >> >> > Robert
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > My GPG Key ID: 336E2680
>> >
>> >
>> >
>> >
>> > --
>> > My GPG Key ID: 336E2680
>
>
>
>
> --
> My GPG Key ID: 336E2680