(no subject)

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

(no subject)

Jakob Ericsson
Hello,

We are running into a strange problem with Direct Memory buffers. From what I know, we are not using any direct memory buffers inside our code.
This is pretty trivial streaming application just doing some dedupliction and union some kafka streams.

/Jakob



2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task                     - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with exception.
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
        at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
        at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
        at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
        at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
        at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        ... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:658)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
        at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
        at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
        at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
        at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
        at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
        ... 10 more

Reply | Threaded
Open this post in threaded view
|

Re:

Maximilian Michels
Hi Jakob,

Thank you for reporting the bug. Could you please post your
configuration here? In particular, could you please tell us the value
of the following configuration variables:

taskmanager.heap.mb
taskmanager.network.numberOfBuffers
taskmanager.memory.off-heap

Are you running the Flink cluster in batch or streaming mode?

Direct memory is used by Flink's network layer. My guess is that you
have set taskmanager.heap.mb too low (it constraints the number of
direct memory at the moment).

Thank you,
Max


On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
<[hidden email]> wrote:

> Hello,
>
> We are running into a strange problem with Direct Memory buffers. From what
> I know, we are not using any direct memory buffers inside our code.
> This is pretty trivial streaming application just doing some dedupliction
> and union some kafka streams.
>
> /Jakob
>
>
>
> 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
> - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
> exception.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
>         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>         at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>         at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>         at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>         ... 9 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:658)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>         at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>         at
> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>         at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>         at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>         at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>         at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>         ... 10 more
>
Reply | Threaded
Open this post in threaded view
|

Re:

Jakob Ericsson
Hi,

See answers below.

/Jakob

On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <[hidden email]> wrote:
Hi Jakob,

Thank you for reporting the bug. Could you please post your
configuration here? In particular, could you please tell us the value
of the following configuration variables:

taskmanager.heap.mb

taskmanager.heap.mb: 2048

taskmanager.network.numberOfBuffers

Default value. Not changed.
 
taskmanager.memory.off-heap

Default value Not changed.
 
Are you running the Flink cluster in batch or streaming mode?

Started in streaming mode. Running with two nodes. In the cluster.
Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to some strange java core dumps in the G1 GC.
 
Direct memory is used by Flink's network layer. My guess is that you
have set taskmanager.heap.mb too low (it constraints the number of
direct memory at the moment).

Thank you,
Max


On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
<[hidden email]> wrote:
> Hello,
>
> We are running into a strange problem with Direct Memory buffers. From what
> I know, we are not using any direct memory buffers inside our code.
> This is pretty trivial streaming application just doing some dedupliction
> and union some kafka streams.
>
> /Jakob
>
>
>
> 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
> - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
> exception.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
>         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>         at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>         at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>         at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>         ... 9 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:658)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>         at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>         at
> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>         at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>         at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>         at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>         at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>         ... 10 more
>

Reply | Threaded
Open this post in threaded view
|

Re:

Maximilian Michels
Hi Jakob,

Thanks. Flink allocates its network memory as direct memory outside
the normal Java heap. By default, that is 64MB but can grow up to
128MB on heavy network transfer. How much memory does your machine
have? Could it be that your upper memory bound is lower than 2048 +
128 MB?

Best,
Max

On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
<[hidden email]> wrote:

> Hi,
>
> See answers below.
>
> /Jakob
>
> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Jakob,
>>
>> Thank you for reporting the bug. Could you please post your
>> configuration here? In particular, could you please tell us the value
>> of the following configuration variables:
>>
>> taskmanager.heap.mb
>
> taskmanager.heap.mb: 2048
>>
>> taskmanager.network.numberOfBuffers
>
>
> Default value. Not changed.
>
>>
>> taskmanager.memory.off-heap
>>
> Default value Not changed.
>
>>
>> Are you running the Flink cluster in batch or streaming mode?
>>
> Started in streaming mode. Running with two nodes. In the cluster.
> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to some
> strange java core dumps in the G1 GC.
>
>>
>> Direct memory is used by Flink's network layer. My guess is that you
>> have set taskmanager.heap.mb too low (it constraints the number of
>> direct memory at the moment).
>>
>> Thank you,
>> Max
>>
>>
>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> <[hidden email]> wrote:
>> > Hello,
>> >
>> > We are running into a strange problem with Direct Memory buffers. From
>> > what
>> > I know, we are not using any direct memory buffers inside our code.
>> > This is pretty trivial streaming application just doing some
>> > dedupliction
>> > and union some kafka streams.
>> >
>> > /Jakob
>> >
>> >
>> >
>> > 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
>> > exception.
>> >
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >         at
>> >
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >         at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >         at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >         at
>> >
>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >         at
>> >
>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >         at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >         at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >         at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >         at
>> >
>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >         at java.lang.Thread.run(Thread.java:745)
>> > Caused by: io.netty.handler.codec.DecoderException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >         at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >         at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >         ... 9 more
>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>> >         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> >         at
>> >
>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>> >         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>> >         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>> >         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>> >         at
>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>> >         at
>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >         at
>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >         at
>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >         at
>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >         at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >         at
>> >
>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >         ... 10 more
>> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re:

Maximilian Michels
I forgot to ask you: Which version of Flink are you using? 0.9.1 or
0.10-SNAPSHOT?

On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]> wrote:

> Hi Jakob,
>
> Thanks. Flink allocates its network memory as direct memory outside
> the normal Java heap. By default, that is 64MB but can grow up to
> 128MB on heavy network transfer. How much memory does your machine
> have? Could it be that your upper memory bound is lower than 2048 +
> 128 MB?
>
> Best,
> Max
>
> On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
> <[hidden email]> wrote:
>> Hi,
>>
>> See answers below.
>>
>> /Jakob
>>
>> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <[hidden email]> wrote:
>>>
>>> Hi Jakob,
>>>
>>> Thank you for reporting the bug. Could you please post your
>>> configuration here? In particular, could you please tell us the value
>>> of the following configuration variables:
>>>
>>> taskmanager.heap.mb
>>
>> taskmanager.heap.mb: 2048
>>>
>>> taskmanager.network.numberOfBuffers
>>
>>
>> Default value. Not changed.
>>
>>>
>>> taskmanager.memory.off-heap
>>>
>> Default value Not changed.
>>
>>>
>>> Are you running the Flink cluster in batch or streaming mode?
>>>
>> Started in streaming mode. Running with two nodes. In the cluster.
>> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to some
>> strange java core dumps in the G1 GC.
>>
>>>
>>> Direct memory is used by Flink's network layer. My guess is that you
>>> have set taskmanager.heap.mb too low (it constraints the number of
>>> direct memory at the moment).
>>>
>>> Thank you,
>>> Max
>>>
>>>
>>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>>> <[hidden email]> wrote:
>>> > Hello,
>>> >
>>> > We are running into a strange problem with Direct Memory buffers. From
>>> > what
>>> > I know, we are not using any direct memory buffers inside our code.
>>> > This is pretty trivial streaming application just doing some
>>> > dedupliction
>>> > and union some kafka streams.
>>> >
>>> > /Jakob
>>> >
>>> >
>>> >
>>> > 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
>>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
>>> > exception.
>>> >
>>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >         at
>>> >
>>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >         at
>>> >
>>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >         at
>>> >
>>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>> >         at
>>> >
>>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>>> >         at
>>> >
>>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>> >         at
>>> >
>>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>> >         at
>>> >
>>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> >         at
>>> >
>>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> >         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> >         at
>>> >
>>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>>> >         at java.lang.Thread.run(Thread.java:745)
>>> > Caused by: io.netty.handler.codec.DecoderException:
>>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >         at
>>> >
>>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> >         ... 9 more
>>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>>> >         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>>> >         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>>> >         at
>>> >
>>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>>> >         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>>> >         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>>> >         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>>> >         at
>>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>>> >         at
>>> >
>>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>>> >         at
>>> >
>>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>>> >         ... 10 more
>>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:

Gyula Fóra
It's 0.10-SNAPSHOT

Gyula

Maximilian Michels <[hidden email]> ezt írta (időpont: 2015. okt. 19., H, 17:13):
I forgot to ask you: Which version of Flink are you using? 0.9.1 or
0.10-SNAPSHOT?

On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]> wrote:
> Hi Jakob,
>
> Thanks. Flink allocates its network memory as direct memory outside
> the normal Java heap. By default, that is 64MB but can grow up to
> 128MB on heavy network transfer. How much memory does your machine
> have? Could it be that your upper memory bound is lower than 2048 +
> 128 MB?
>
> Best,
> Max
>
> On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
> <[hidden email]> wrote:
>> Hi,
>>
>> See answers below.
>>
>> /Jakob
>>
>> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <[hidden email]> wrote:
>>>
>>> Hi Jakob,
>>>
>>> Thank you for reporting the bug. Could you please post your
>>> configuration here? In particular, could you please tell us the value
>>> of the following configuration variables:
>>>
>>> taskmanager.heap.mb
>>
>> taskmanager.heap.mb: 2048
>>>
>>> taskmanager.network.numberOfBuffers
>>
>>
>> Default value. Not changed.
>>
>>>
>>> taskmanager.memory.off-heap
>>>
>> Default value Not changed.
>>
>>>
>>> Are you running the Flink cluster in batch or streaming mode?
>>>
>> Started in streaming mode. Running with two nodes. In the cluster.
>> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to some
>> strange java core dumps in the G1 GC.
>>
>>>
>>> Direct memory is used by Flink's network layer. My guess is that you
>>> have set taskmanager.heap.mb too low (it constraints the number of
>>> direct memory at the moment).
>>>
>>> Thank you,
>>> Max
>>>
>>>
>>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>>> <[hidden email]> wrote:
>>> > Hello,
>>> >
>>> > We are running into a strange problem with Direct Memory buffers. From
>>> > what
>>> > I know, we are not using any direct memory buffers inside our code.
>>> > This is pretty trivial streaming application just doing some
>>> > dedupliction
>>> > and union some kafka streams.
>>> >
>>> > /Jakob
>>> >
>>> >
>>> >
>>> > 2015-10-19 13:27:59,064 INFO  org.apache.flink.runtime.taskmanager.Task
>>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED with
>>> > exception.
>>> >
>>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >         at
>>> >
>>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >         at
>>> >
>>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >         at
>>> >
>>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>> >         at
>>> >
>>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>>> >         at
>>> >
>>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>> >         at
>>> >
>>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>> >         at
>>> >
>>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> >         at
>>> >
>>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> >         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> >         at
>>> >
>>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>>> >         at java.lang.Thread.run(Thread.java:745)
>>> > Caused by: io.netty.handler.codec.DecoderException:
>>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >         at
>>> >
>>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>>> >         at
>>> >
>>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> >         ... 9 more
>>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>>> >         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>>> >         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>>> >         at
>>> >
>>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>>> >         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>>> >         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>>> >         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>>> >         at
>>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>>> >         at
>>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>>> >         at
>>> >
>>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>>> >         at
>>> >
>>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>>> >         ... 10 more
>>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:

Maximilian Michels
When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
If it has been more than a couple of weeks, then I'd advise you to
update to the latest snapshot version. There has been an issue with
the calculation of the off-heap memory limit in the past.

Thanks,
Max

On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <[hidden email]> wrote:

> It's 0.10-SNAPSHOT
>
> Gyula
>
> Maximilian Michels <[hidden email]> ezt írta (időpont: 2015. okt. 19., H,
> 17:13):
>>
>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> 0.10-SNAPSHOT?
>>
>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]>
>> wrote:
>> > Hi Jakob,
>> >
>> > Thanks. Flink allocates its network memory as direct memory outside
>> > the normal Java heap. By default, that is 64MB but can grow up to
>> > 128MB on heavy network transfer. How much memory does your machine
>> > have? Could it be that your upper memory bound is lower than 2048 +
>> > 128 MB?
>> >
>> > Best,
>> > Max
>> >
>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> > <[hidden email]> wrote:
>> >> Hi,
>> >>
>> >> See answers below.
>> >>
>> >> /Jakob
>> >>
>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Hi Jakob,
>> >>>
>> >>> Thank you for reporting the bug. Could you please post your
>> >>> configuration here? In particular, could you please tell us the value
>> >>> of the following configuration variables:
>> >>>
>> >>> taskmanager.heap.mb
>> >>
>> >> taskmanager.heap.mb: 2048
>> >>>
>> >>> taskmanager.network.numberOfBuffers
>> >>
>> >>
>> >> Default value. Not changed.
>> >>
>> >>>
>> >>> taskmanager.memory.off-heap
>> >>>
>> >> Default value Not changed.
>> >>
>> >>>
>> >>> Are you running the Flink cluster in batch or streaming mode?
>> >>>
>> >> Started in streaming mode. Running with two nodes. In the cluster.
>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to
>> >> some
>> >> strange java core dumps in the G1 GC.
>> >>
>> >>>
>> >>> Direct memory is used by Flink's network layer. My guess is that you
>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>> >>> direct memory at the moment).
>> >>>
>> >>> Thank you,
>> >>> Max
>> >>>
>> >>>
>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> >>> <[hidden email]> wrote:
>> >>> > Hello,
>> >>> >
>> >>> > We are running into a strange problem with Direct Memory buffers.
>> >>> > From
>> >>> > what
>> >>> > I know, we are not using any direct memory buffers inside our code.
>> >>> > This is pretty trivial streaming application just doing some
>> >>> > dedupliction
>> >>> > and union some kafka streams.
>> >>> >
>> >>> > /Jakob
>> >>> >
>> >>> >
>> >>> >
>> >>> > 2015-10-19 13:27:59,064 INFO
>> >>> > org.apache.flink.runtime.taskmanager.Task
>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED
>> >>> > with
>> >>> > exception.
>> >>> >
>> >>> >
>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >>> >         at
>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >>> >         at java.lang.Thread.run(Thread.java:745)
>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >>> >         ... 9 more
>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>> >>> >         at
>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >>> >         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>> >>> >         at
>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>> >>> >         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>> >>> >         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>> >>> >         at
>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>> >>> >         at
>> >>> >
>> >>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >>> >         at
>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >>> >         at
>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >>> >         at
>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >>> >         at
>> >>> >
>> >>> >
>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >>> >         ... 10 more
>> >>> >
>> >>
>> >>
Reply | Threaded
Open this post in threaded view
|

Re:

Maximilian Michels
You can see the revision number and the build date in the JobManager
log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"

On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <[hidden email]> wrote:

> When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
> If it has been more than a couple of weeks, then I'd advise you to
> update to the latest snapshot version. There has been an issue with
> the calculation of the off-heap memory limit in the past.
>
> Thanks,
> Max
>
> On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <[hidden email]> wrote:
>> It's 0.10-SNAPSHOT
>>
>> Gyula
>>
>> Maximilian Michels <[hidden email]> ezt írta (időpont: 2015. okt. 19., H,
>> 17:13):
>>>
>>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>>> 0.10-SNAPSHOT?
>>>
>>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]>
>>> wrote:
>>> > Hi Jakob,
>>> >
>>> > Thanks. Flink allocates its network memory as direct memory outside
>>> > the normal Java heap. By default, that is 64MB but can grow up to
>>> > 128MB on heavy network transfer. How much memory does your machine
>>> > have? Could it be that your upper memory bound is lower than 2048 +
>>> > 128 MB?
>>> >
>>> > Best,
>>> > Max
>>> >
>>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>>> > <[hidden email]> wrote:
>>> >> Hi,
>>> >>
>>> >> See answers below.
>>> >>
>>> >> /Jakob
>>> >>
>>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <[hidden email]>
>>> >> wrote:
>>> >>>
>>> >>> Hi Jakob,
>>> >>>
>>> >>> Thank you for reporting the bug. Could you please post your
>>> >>> configuration here? In particular, could you please tell us the value
>>> >>> of the following configuration variables:
>>> >>>
>>> >>> taskmanager.heap.mb
>>> >>
>>> >> taskmanager.heap.mb: 2048
>>> >>>
>>> >>> taskmanager.network.numberOfBuffers
>>> >>
>>> >>
>>> >> Default value. Not changed.
>>> >>
>>> >>>
>>> >>> taskmanager.memory.off-heap
>>> >>>
>>> >> Default value Not changed.
>>> >>
>>> >>>
>>> >>> Are you running the Flink cluster in batch or streaming mode?
>>> >>>
>>> >> Started in streaming mode. Running with two nodes. In the cluster.
>>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to
>>> >> some
>>> >> strange java core dumps in the G1 GC.
>>> >>
>>> >>>
>>> >>> Direct memory is used by Flink's network layer. My guess is that you
>>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>>> >>> direct memory at the moment).
>>> >>>
>>> >>> Thank you,
>>> >>> Max
>>> >>>
>>> >>>
>>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>>> >>> <[hidden email]> wrote:
>>> >>> > Hello,
>>> >>> >
>>> >>> > We are running into a strange problem with Direct Memory buffers.
>>> >>> > From
>>> >>> > what
>>> >>> > I know, we are not using any direct memory buffers inside our code.
>>> >>> > This is pretty trivial streaming application just doing some
>>> >>> > dedupliction
>>> >>> > and union some kafka streams.
>>> >>> >
>>> >>> > /Jakob
>>> >>> >
>>> >>> >
>>> >>> >
>>> >>> > 2015-10-19 13:27:59,064 INFO
>>> >>> > org.apache.flink.runtime.taskmanager.Task
>>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED
>>> >>> > with
>>> >>> > exception.
>>> >>> >
>>> >>> >
>>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> >>> >         at
>>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>>> >>> >         at java.lang.Thread.run(Thread.java:745)
>>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> >>> >         ... 9 more
>>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>>> >>> >         at
>>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>>> >>> >         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>>> >>> >         at
>>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>>> >>> >         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>>> >>> >         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>>> >>> >         at
>>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>>> >>> >         at
>>> >>> >
>>> >>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>>> >>> >         at
>>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>>> >>> >         at
>>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>>> >>> >         at
>>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>>> >>> >         ... 10 more
>>> >>> >
>>> >>
>>> >>
Reply | Threaded
Open this post in threaded view
|

Re:

Jakob Ericsson
The revision is "Starting JobManager (Version: 0.10-SNAPSHOT, Rev:c82ebbf, Date:15.10.2015 @ 11:34:01 CEST)"

We have a lot of memory left on the machine. I have increased it quite a lot. 

What is your thought on memory configuration?
If I understand Flink correctly, you should only have one taskmanager running each host?

For a pretty standard machine with 16 cores and 32-64 GB memory. This means that you will have one java process running with a Xmx30G or even higher for exhausting all memory of the machine. This is, at least for the CMS GC, not the most optimal configuration. 
It might be viable for G1 but we got some really serious java core dumps when running G1. 

I looked a bit on the flags that was set on the process and it seems that Xmx and MaxDirectMemorySize are set to the same value by the shell script.
When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was running with a taskmanager.heap.mb:2048. So the direct memory buffer was set to 2GB.

I have restarted the process with G1 again and 20GB as taskmanager.heap.mb. Lets see if it will be stable during the night.


On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <[hidden email]> wrote:
You can see the revision number and the build date in the JobManager
log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"

On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <[hidden email]> wrote:
> When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
> If it has been more than a couple of weeks, then I'd advise you to
> update to the latest snapshot version. There has been an issue with
> the calculation of the off-heap memory limit in the past.
>
> Thanks,
> Max
>
> On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <[hidden email]> wrote:
>> It's 0.10-SNAPSHOT
>>
>> Gyula
>>
>> Maximilian Michels <[hidden email]> ezt írta (időpont: 2015. okt. 19., H,
>> 17:13):
>>>
>>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>>> 0.10-SNAPSHOT?
>>>
>>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]>
>>> wrote:
>>> > Hi Jakob,
>>> >
>>> > Thanks. Flink allocates its network memory as direct memory outside
>>> > the normal Java heap. By default, that is 64MB but can grow up to
>>> > 128MB on heavy network transfer. How much memory does your machine
>>> > have? Could it be that your upper memory bound is lower than 2048 +
>>> > 128 MB?
>>> >
>>> > Best,
>>> > Max
>>> >
>>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>>> > <[hidden email]> wrote:
>>> >> Hi,
>>> >>
>>> >> See answers below.
>>> >>
>>> >> /Jakob
>>> >>
>>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels <[hidden email]>
>>> >> wrote:
>>> >>>
>>> >>> Hi Jakob,
>>> >>>
>>> >>> Thank you for reporting the bug. Could you please post your
>>> >>> configuration here? In particular, could you please tell us the value
>>> >>> of the following configuration variables:
>>> >>>
>>> >>> taskmanager.heap.mb
>>> >>
>>> >> taskmanager.heap.mb: 2048
>>> >>>
>>> >>> taskmanager.network.numberOfBuffers
>>> >>
>>> >>
>>> >> Default value. Not changed.
>>> >>
>>> >>>
>>> >>> taskmanager.memory.off-heap
>>> >>>
>>> >> Default value Not changed.
>>> >>
>>> >>>
>>> >>> Are you running the Flink cluster in batch or streaming mode?
>>> >>>
>>> >> Started in streaming mode. Running with two nodes. In the cluster.
>>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due to
>>> >> some
>>> >> strange java core dumps in the G1 GC.
>>> >>
>>> >>>
>>> >>> Direct memory is used by Flink's network layer. My guess is that you
>>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>>> >>> direct memory at the moment).
>>> >>>
>>> >>> Thank you,
>>> >>> Max
>>> >>>
>>> >>>
>>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>>> >>> <[hidden email]> wrote:
>>> >>> > Hello,
>>> >>> >
>>> >>> > We are running into a strange problem with Direct Memory buffers.
>>> >>> > From
>>> >>> > what
>>> >>> > I know, we are not using any direct memory buffers inside our code.
>>> >>> > This is pretty trivial streaming application just doing some
>>> >>> > dedupliction
>>> >>> > and union some kafka streams.
>>> >>> >
>>> >>> > /Jakob
>>> >>> >
>>> >>> >
>>> >>> >
>>> >>> > 2015-10-19 13:27:59,064 INFO
>>> >>> > org.apache.flink.runtime.taskmanager.Task
>>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to FAILED
>>> >>> > with
>>> >>> > exception.
>>> >>> >
>>> >>> >
>>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> >>> >         at
>>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>>> >>> >         at java.lang.Thread.run(Thread.java:745)
>>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> >>> >         ... 9 more
>>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>>> >>> >         at
>>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>>> >>> >         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>>> >>> >         at
>>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>>> >>> >         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>>> >>> >         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>>> >>> >         at
>>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>>> >>> >         at
>>> >>> >
>>> >>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>>> >>> >         at
>>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>>> >>> >         at
>>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>>> >>> >         at
>>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>>> >>> >         at
>>> >>> >
>>> >>> >
>>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>>> >>> >         ... 10 more
>>> >>> >
>>> >>
>>> >>

Reply | Threaded
Open this post in threaded view
|

Re:

Maximilian Michels
Hi Jakob,

Your revision number is fairly new and your direct memory
configuration seems to be correct for your setup. If you have the
time, you could verify that the memory flags for the JVM are set
correctly by the startup script. You can see that in the first lines
of the task manager log. If the direct memory was set to 2GB with the
default number of network buffers, the JVM should have had enough
direct memory. Still, we'd like to find out what caused your problem.

Are you running on YARN or standalone?

Yes, the usual setup is one task manager per host/VM. The task manager
will allocate all memory upfront. However, a large part of this memory
will be self-managed by Flink and not touched much by the GC. By
default, this is 0.7 of the configured heap memory. You can control
this ratio with the taskmanager.memory.fraction variable. You can also
set a fixed managed memory size using taskmanager.memory.size (MB). In
large memory setups, we have seen a slightly better performance using
off-heap memory allocation. This can be configured using
taskmanager.memory.off-heap: true.

Please let us know if you experience any further issues.

Best,
Max

On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson
<[hidden email]> wrote:

> The revision is "Starting JobManager (Version: 0.10-SNAPSHOT, Rev:c82ebbf,
> Date:15.10.2015 @ 11:34:01 CEST)"
>
> We have a lot of memory left on the machine. I have increased it quite a
> lot.
>
> What is your thought on memory configuration?
> If I understand Flink correctly, you should only have one taskmanager
> running each host?
>
> For a pretty standard machine with 16 cores and 32-64 GB memory. This means
> that you will have one java process running with a Xmx30G or even higher for
> exhausting all memory of the machine. This is, at least for the CMS GC, not
> the most optimal configuration.
> It might be viable for G1 but we got some really serious java core dumps
> when running G1.
>
> I looked a bit on the flags that was set on the process and it seems that
> Xmx and MaxDirectMemorySize are set to the same value by the shell script.
> When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was
> running with a taskmanager.heap.mb:2048. So the direct memory buffer was set
> to 2GB.
>
> I have restarted the process with G1 again and 20GB as taskmanager.heap.mb.
> Lets see if it will be stable during the night.
>
>
> On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> You can see the revision number and the build date in the JobManager
>> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
>> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"
>>
>> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <[hidden email]>
>> wrote:
>> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
>> > If it has been more than a couple of weeks, then I'd advise you to
>> > update to the latest snapshot version. There has been an issue with
>> > the calculation of the off-heap memory limit in the past.
>> >
>> > Thanks,
>> > Max
>> >
>> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <[hidden email]>
>> > wrote:
>> >> It's 0.10-SNAPSHOT
>> >>
>> >> Gyula
>> >>
>> >> Maximilian Michels <[hidden email]> ezt írta (időpont: 2015. okt. 19.,
>> >> H,
>> >> 17:13):
>> >>>
>> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> >>> 0.10-SNAPSHOT?
>> >>>
>> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]>
>> >>> wrote:
>> >>> > Hi Jakob,
>> >>> >
>> >>> > Thanks. Flink allocates its network memory as direct memory outside
>> >>> > the normal Java heap. By default, that is 64MB but can grow up to
>> >>> > 128MB on heavy network transfer. How much memory does your machine
>> >>> > have? Could it be that your upper memory bound is lower than 2048 +
>> >>> > 128 MB?
>> >>> >
>> >>> > Best,
>> >>> > Max
>> >>> >
>> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> >>> > <[hidden email]> wrote:
>> >>> >> Hi,
>> >>> >>
>> >>> >> See answers below.
>> >>> >>
>> >>> >> /Jakob
>> >>> >>
>> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels
>> >>> >> <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hi Jakob,
>> >>> >>>
>> >>> >>> Thank you for reporting the bug. Could you please post your
>> >>> >>> configuration here? In particular, could you please tell us the
>> >>> >>> value
>> >>> >>> of the following configuration variables:
>> >>> >>>
>> >>> >>> taskmanager.heap.mb
>> >>> >>
>> >>> >> taskmanager.heap.mb: 2048
>> >>> >>>
>> >>> >>> taskmanager.network.numberOfBuffers
>> >>> >>
>> >>> >>
>> >>> >> Default value. Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> taskmanager.memory.off-heap
>> >>> >>>
>> >>> >> Default value Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> Are you running the Flink cluster in batch or streaming mode?
>> >>> >>>
>> >>> >> Started in streaming mode. Running with two nodes. In the cluster.
>> >>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due
>> >>> >> to
>> >>> >> some
>> >>> >> strange java core dumps in the G1 GC.
>> >>> >>
>> >>> >>>
>> >>> >>> Direct memory is used by Flink's network layer. My guess is that
>> >>> >>> you
>> >>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>> >>> >>> direct memory at the moment).
>> >>> >>>
>> >>> >>> Thank you,
>> >>> >>> Max
>> >>> >>>
>> >>> >>>
>> >>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> >>> >>> <[hidden email]> wrote:
>> >>> >>> > Hello,
>> >>> >>> >
>> >>> >>> > We are running into a strange problem with Direct Memory
>> >>> >>> > buffers.
>> >>> >>> > From
>> >>> >>> > what
>> >>> >>> > I know, we are not using any direct memory buffers inside our
>> >>> >>> > code.
>> >>> >>> > This is pretty trivial streaming application just doing some
>> >>> >>> > dedupliction
>> >>> >>> > and union some kafka streams.
>> >>> >>> >
>> >>> >>> > /Jakob
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > 2015-10-19 13:27:59,064 INFO
>> >>> >>> > org.apache.flink.runtime.taskmanager.Task
>> >>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to
>> >>> >>> > FAILED
>> >>> >>> > with
>> >>> >>> > exception.
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >>> >>> >         at
>> >>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >>> >>> >         at java.lang.Thread.run(Thread.java:745)
>> >>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >>> >>> >         ... 9 more
>> >>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>> >>> >>> >         at
>> >>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >>> >>> >         at
>> >>> >>> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >>> >>> >         ... 10 more
>> >>> >>> >
>> >>> >>
>> >>> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:

Stephan Ewen
@Jakob: If you use Flink standalone (not through YARN), one thing to be aware of is that the relevant change is in the bash scripts that start the cluster, not the code. If you upgraded Flink by copying a newer JAR file, you missed the update of the bash scripts and missed the fix for that issue.

On Tue, Oct 20, 2015 at 10:39 AM, Maximilian Michels <[hidden email]> wrote:
Hi Jakob,

Your revision number is fairly new and your direct memory
configuration seems to be correct for your setup. If you have the
time, you could verify that the memory flags for the JVM are set
correctly by the startup script. You can see that in the first lines
of the task manager log. If the direct memory was set to 2GB with the
default number of network buffers, the JVM should have had enough
direct memory. Still, we'd like to find out what caused your problem.

Are you running on YARN or standalone?

Yes, the usual setup is one task manager per host/VM. The task manager
will allocate all memory upfront. However, a large part of this memory
will be self-managed by Flink and not touched much by the GC. By
default, this is 0.7 of the configured heap memory. You can control
this ratio with the taskmanager.memory.fraction variable. You can also
set a fixed managed memory size using taskmanager.memory.size (MB). In
large memory setups, we have seen a slightly better performance using
off-heap memory allocation. This can be configured using
taskmanager.memory.off-heap: true.

Please let us know if you experience any further issues.

Best,
Max

On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson
<[hidden email]> wrote:
> The revision is "Starting JobManager (Version: 0.10-SNAPSHOT, Rev:c82ebbf,
> Date:15.10.2015 @ 11:34:01 CEST)"
>
> We have a lot of memory left on the machine. I have increased it quite a
> lot.
>
> What is your thought on memory configuration?
> If I understand Flink correctly, you should only have one taskmanager
> running each host?
>
> For a pretty standard machine with 16 cores and 32-64 GB memory. This means
> that you will have one java process running with a Xmx30G or even higher for
> exhausting all memory of the machine. This is, at least for the CMS GC, not
> the most optimal configuration.
> It might be viable for G1 but we got some really serious java core dumps
> when running G1.
>
> I looked a bit on the flags that was set on the process and it seems that
> Xmx and MaxDirectMemorySize are set to the same value by the shell script.
> When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was
> running with a taskmanager.heap.mb:2048. So the direct memory buffer was set
> to 2GB.
>
> I have restarted the process with G1 again and 20GB as taskmanager.heap.mb.
> Lets see if it will be stable during the night.
>
>
> On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> You can see the revision number and the build date in the JobManager
>> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
>> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"
>>
>> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <[hidden email]>
>> wrote:
>> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
>> > If it has been more than a couple of weeks, then I'd advise you to
>> > update to the latest snapshot version. There has been an issue with
>> > the calculation of the off-heap memory limit in the past.
>> >
>> > Thanks,
>> > Max
>> >
>> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <[hidden email]>
>> > wrote:
>> >> It's 0.10-SNAPSHOT
>> >>
>> >> Gyula
>> >>
>> >> Maximilian Michels <[hidden email]> ezt írta (időpont: 2015. okt. 19.,
>> >> H,
>> >> 17:13):
>> >>>
>> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> >>> 0.10-SNAPSHOT?
>> >>>
>> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]>
>> >>> wrote:
>> >>> > Hi Jakob,
>> >>> >
>> >>> > Thanks. Flink allocates its network memory as direct memory outside
>> >>> > the normal Java heap. By default, that is 64MB but can grow up to
>> >>> > 128MB on heavy network transfer. How much memory does your machine
>> >>> > have? Could it be that your upper memory bound is lower than 2048 +
>> >>> > 128 MB?
>> >>> >
>> >>> > Best,
>> >>> > Max
>> >>> >
>> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> >>> > <[hidden email]> wrote:
>> >>> >> Hi,
>> >>> >>
>> >>> >> See answers below.
>> >>> >>
>> >>> >> /Jakob
>> >>> >>
>> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels
>> >>> >> <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hi Jakob,
>> >>> >>>
>> >>> >>> Thank you for reporting the bug. Could you please post your
>> >>> >>> configuration here? In particular, could you please tell us the
>> >>> >>> value
>> >>> >>> of the following configuration variables:
>> >>> >>>
>> >>> >>> taskmanager.heap.mb
>> >>> >>
>> >>> >> taskmanager.heap.mb: 2048
>> >>> >>>
>> >>> >>> taskmanager.network.numberOfBuffers
>> >>> >>
>> >>> >>
>> >>> >> Default value. Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> taskmanager.memory.off-heap
>> >>> >>>
>> >>> >> Default value Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> Are you running the Flink cluster in batch or streaming mode?
>> >>> >>>
>> >>> >> Started in streaming mode. Running with two nodes. In the cluster.
>> >>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due
>> >>> >> to
>> >>> >> some
>> >>> >> strange java core dumps in the G1 GC.
>> >>> >>
>> >>> >>>
>> >>> >>> Direct memory is used by Flink's network layer. My guess is that
>> >>> >>> you
>> >>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>> >>> >>> direct memory at the moment).
>> >>> >>>
>> >>> >>> Thank you,
>> >>> >>> Max
>> >>> >>>
>> >>> >>>
>> >>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> >>> >>> <[hidden email]> wrote:
>> >>> >>> > Hello,
>> >>> >>> >
>> >>> >>> > We are running into a strange problem with Direct Memory
>> >>> >>> > buffers.
>> >>> >>> > From
>> >>> >>> > what
>> >>> >>> > I know, we are not using any direct memory buffers inside our
>> >>> >>> > code.
>> >>> >>> > This is pretty trivial streaming application just doing some
>> >>> >>> > dedupliction
>> >>> >>> > and union some kafka streams.
>> >>> >>> >
>> >>> >>> > /Jakob
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > 2015-10-19 13:27:59,064 INFO
>> >>> >>> > org.apache.flink.runtime.taskmanager.Task
>> >>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to
>> >>> >>> > FAILED
>> >>> >>> > with
>> >>> >>> > exception.
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >>> >>> >         at
>> >>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >>> >>> >         at java.lang.Thread.run(Thread.java:745)
>> >>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >>> >>> >         ... 9 more
>> >>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>> >>> >>> >         at
>> >>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >>> >>> >         at
>> >>> >>> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >>> >>> >         ... 10 more
>> >>> >>> >
>> >>> >>
>> >>> >>
>
>

Reply | Threaded
Open this post in threaded view
|

Re:

Jakob Ericsson
Hi,

We are using standalone flink at the moment. 
Our snapshot build comes from src. I removed everything from old build before I started the server. There shouldn't be any traces from the old stuff left.

I bumped the process to 20GB and has been running stabile and also running G1.
There might still be some kind of memory leak that I don't see atm due to the large heap. 

This is how the taskmanager process looks right now.

java -XX:+UseG1GC -Xms20480M -Xmx20480M -XX:MaxDirectMemorySize=20480M -Dlog.file=/appsdir/flink-0.10-SNAPSHOT/log/flink-splat-taskmanager-0-fbweb608.log -Dlog4j.configuration=file:/appsdir/flink-0.10-SNAPSHOT/conf/log4j.properties -Dlogback.configurationFile=file:/appsdir/flink-0.10-SNAPSHOT/conf/logback.xml -classpath /appsdir/flink-0.10-SNAPSHOT/lib/flink-dist-0.10-SNAPSHOT.jar:/appsdir/flink-0.10-SNAPSHOT/lib/flink-python-0.10-SNAPSHOT.jar:/appsdir/flink-0.10-SNAPSHOT/lib/log4j-1.2.17.jar:/appsdir/flink-0.10-SNAPSHOT/lib/slf4j-log4j12-1.7.7.jar::: org.apache.flink.runtime.taskmanager.TaskManager --configDir /appsdir/flink-0.10-SNAPSHOT/conf --streamingMode streaming




/Jakob

On Tue, Oct 20, 2015 at 3:38 PM, Stephan Ewen <[hidden email]> wrote:
@Jakob: If you use Flink standalone (not through YARN), one thing to be aware of is that the relevant change is in the bash scripts that start the cluster, not the code. If you upgraded Flink by copying a newer JAR file, you missed the update of the bash scripts and missed the fix for that issue.

On Tue, Oct 20, 2015 at 10:39 AM, Maximilian Michels <[hidden email]> wrote:
Hi Jakob,

Your revision number is fairly new and your direct memory
configuration seems to be correct for your setup. If you have the
time, you could verify that the memory flags for the JVM are set
correctly by the startup script. You can see that in the first lines
of the task manager log. If the direct memory was set to 2GB with the
default number of network buffers, the JVM should have had enough
direct memory. Still, we'd like to find out what caused your problem.

Are you running on YARN or standalone?

Yes, the usual setup is one task manager per host/VM. The task manager
will allocate all memory upfront. However, a large part of this memory
will be self-managed by Flink and not touched much by the GC. By
default, this is 0.7 of the configured heap memory. You can control
this ratio with the taskmanager.memory.fraction variable. You can also
set a fixed managed memory size using taskmanager.memory.size (MB). In
large memory setups, we have seen a slightly better performance using
off-heap memory allocation. This can be configured using
taskmanager.memory.off-heap: true.

Please let us know if you experience any further issues.

Best,
Max

On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson
<[hidden email]> wrote:
> The revision is "Starting JobManager (Version: 0.10-SNAPSHOT, Rev:c82ebbf,
> Date:15.10.2015 @ 11:34:01 CEST)"
>
> We have a lot of memory left on the machine. I have increased it quite a
> lot.
>
> What is your thought on memory configuration?
> If I understand Flink correctly, you should only have one taskmanager
> running each host?
>
> For a pretty standard machine with 16 cores and 32-64 GB memory. This means
> that you will have one java process running with a Xmx30G or even higher for
> exhausting all memory of the machine. This is, at least for the CMS GC, not
> the most optimal configuration.
> It might be viable for G1 but we got some really serious java core dumps
> when running G1.
>
> I looked a bit on the flags that was set on the process and it seems that
> Xmx and MaxDirectMemorySize are set to the same value by the shell script.
> When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was
> running with a taskmanager.heap.mb:2048. So the direct memory buffer was set
> to 2GB.
>
> I have restarted the process with G1 again and 20GB as taskmanager.heap.mb.
> Lets see if it will be stable during the night.
>
>
> On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> You can see the revision number and the build date in the JobManager
>> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
>> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"
>>
>> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <[hidden email]>
>> wrote:
>> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
>> > If it has been more than a couple of weeks, then I'd advise you to
>> > update to the latest snapshot version. There has been an issue with
>> > the calculation of the off-heap memory limit in the past.
>> >
>> > Thanks,
>> > Max
>> >
>> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <[hidden email]>
>> > wrote:
>> >> It's 0.10-SNAPSHOT
>> >>
>> >> Gyula
>> >>
>> >> Maximilian Michels <[hidden email]> ezt írta (időpont: 2015. okt. 19.,
>> >> H,
>> >> 17:13):
>> >>>
>> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> >>> 0.10-SNAPSHOT?
>> >>>
>> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <[hidden email]>
>> >>> wrote:
>> >>> > Hi Jakob,
>> >>> >
>> >>> > Thanks. Flink allocates its network memory as direct memory outside
>> >>> > the normal Java heap. By default, that is 64MB but can grow up to
>> >>> > 128MB on heavy network transfer. How much memory does your machine
>> >>> > have? Could it be that your upper memory bound is lower than 2048 +
>> >>> > 128 MB?
>> >>> >
>> >>> > Best,
>> >>> > Max
>> >>> >
>> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> >>> > <[hidden email]> wrote:
>> >>> >> Hi,
>> >>> >>
>> >>> >> See answers below.
>> >>> >>
>> >>> >> /Jakob
>> >>> >>
>> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels
>> >>> >> <[hidden email]>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hi Jakob,
>> >>> >>>
>> >>> >>> Thank you for reporting the bug. Could you please post your
>> >>> >>> configuration here? In particular, could you please tell us the
>> >>> >>> value
>> >>> >>> of the following configuration variables:
>> >>> >>>
>> >>> >>> taskmanager.heap.mb
>> >>> >>
>> >>> >> taskmanager.heap.mb: 2048
>> >>> >>>
>> >>> >>> taskmanager.network.numberOfBuffers
>> >>> >>
>> >>> >>
>> >>> >> Default value. Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> taskmanager.memory.off-heap
>> >>> >>>
>> >>> >> Default value Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> Are you running the Flink cluster in batch or streaming mode?
>> >>> >>>
>> >>> >> Started in streaming mode. Running with two nodes. In the cluster.
>> >>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due
>> >>> >> to
>> >>> >> some
>> >>> >> strange java core dumps in the G1 GC.
>> >>> >>
>> >>> >>>
>> >>> >>> Direct memory is used by Flink's network layer. My guess is that
>> >>> >>> you
>> >>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>> >>> >>> direct memory at the moment).
>> >>> >>>
>> >>> >>> Thank you,
>> >>> >>> Max
>> >>> >>>
>> >>> >>>
>> >>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> >>> >>> <[hidden email]> wrote:
>> >>> >>> > Hello,
>> >>> >>> >
>> >>> >>> > We are running into a strange problem with Direct Memory
>> >>> >>> > buffers.
>> >>> >>> > From
>> >>> >>> > what
>> >>> >>> > I know, we are not using any direct memory buffers inside our
>> >>> >>> > code.
>> >>> >>> > This is pretty trivial streaming application just doing some
>> >>> >>> > dedupliction
>> >>> >>> > and union some kafka streams.
>> >>> >>> >
>> >>> >>> > /Jakob
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > 2015-10-19 13:27:59,064 INFO
>> >>> >>> > org.apache.flink.runtime.taskmanager.Task
>> >>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to
>> >>> >>> > FAILED
>> >>> >>> > with
>> >>> >>> > exception.
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >>> >>> >         at
>> >>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >>> >>> >         at java.lang.Thread.run(Thread.java:745)
>> >>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >>> >>> >         ... 9 more
>> >>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>> >>> >>> >         at
>> >>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >>> >>> >         at
>> >>> >>> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >>> >>> >         ... 10 more
>> >>> >>> >
>> >>> >>
>> >>> >>
>
>