Off heap memory issue

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Off heap memory issue

Lopez, Javier
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?
Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

rmetzger0
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?

Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

rmetzger0
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?


Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Lopez, Javier
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?



Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Flavio Pompermaier
We also faced the same problem, but the number of jobs we can run before restarting the cluster depends on the volume of the data to shuffle around the network. We even had problems with a single job and in order to avoid OOM issues we had to put some configuration to limit Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacity.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <[hidden email]> wrote:
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?




Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Kien Truong

Hi,

We saw a similar issue in one of our job due to ByteBuffer memory leak[1].

We fixed it using the solution in the article, setting -Djdk.nio.maxCachedBufferSize

This variable is available for Java > 8u102

Best regards,

Kien

[1]http://www.evanjones.ca/java-bytebuffer-leak.html


On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
We also faced the same problem, but the number of jobs we can run before restarting the cluster depends on the volume of the data to shuffle around the network. We even had problems with a single job and in order to avoid OOM issues we had to put some configuration to limit Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacity.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <[hidden email]> wrote:
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?




Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Flavio Pompermaier
Unfortunately the issue I've opened [1] was not a problem of Flink but was just caused by an ever increasing job plan.
So no help from that..Let's hope to find out the real source of the problem.
Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it yet)

Best,

On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong <[hidden email]> wrote:

Hi,

We saw a similar issue in one of our job due to ByteBuffer memory leak[1].

We fixed it using the solution in the article, setting -Djdk.nio.maxCachedBufferSize

This variable is available for Java > 8u102

Best regards,

Kien

[1]http://www.evanjones.ca/java-bytebuffer-leak.html


On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
We also faced the same problem, but the number of jobs we can run before restarting the cluster depends on the volume of the data to shuffle around the network. We even had problems with a single job and in order to avoid OOM issues we had to put some configuration to limit Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacity.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <[hidden email]> wrote:
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?







--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809
Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Piotr Nowojski
Hi,

I have been able to observe some off heap memory “issues” by submitting Kafka job provided by Javier Lopez (in different mailing thread). 

TL;DR;

There was no memory leak, just memory pool “Metaspace” and “Compressed Class Space” are growing in size over time and are only rarely garbage collected. In my test case they together were wasting up to ~7GB of memory, while my test case could use as little as ~100MB. Connect with for example jconsole to your JVM, check their size and cut their size by half by setting:

env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M

In flink-conf.yaml. Everything works fine and memory consumption still too high? Rinse and repeat.


Long story:

In default settings, with max heap size of 1GB, off heap memory consumption, memory consumption off non-heap memory pools of “Metaspace” and “Compressed Class Space” was growing in time which seemed like indefinitely, and Metaspace was always around ~6 times larger compared to compressed class space. Default max meatspace size is unlimited, while “Compressed class space” has a default max size of 1GB. 

When I decreased the CompressedClassSpaceSize down to 100MB, memory consumption grew up to 90MB and then it started bouncing up and down by couple of MB. “Metaspace” was following the same pattern, but using ~600MB. When I decreased down MaxMetaspaceSize to 200MB, memory consumption of both pools was bouncing around ~220MB.

It seems like there are no general guide lines how to configure those values, since it’s heavily application dependent. However this seems like the most likely suspect of the apparent OFF HEAP “memory leak” that was reported couple of times in use cases where users are submitting hundreds/thousands of jobs to Flink cluster. For more information please check here:


Please let us know if this solves your issues.

Thanks, Piotrek

On 13 Nov 2017, at 16:06, Flavio Pompermaier <[hidden email]> wrote:

Unfortunately the issue I've opened [1] was not a problem of Flink but was just caused by an ever increasing job plan.
So no help from that..Let's hope to find out the real source of the problem.
Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it yet)

Best,

On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong <[hidden email]> wrote:

Hi,

We saw a similar issue in one of our job due to ByteBuffer memory leak[1].

We fixed it using the solution in the article, setting -Djdk.nio.maxCachedBufferSize

This variable is available for Java > 8u102

Best regards,

Kien

[1]http://www.evanjones.ca/java-bytebuffer-leak.html


On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
We also faced the same problem, but the number of jobs we can run before restarting the cluster depends on the volume of the data to shuffle around the network. We even had problems with a single job and in order to avoid OOM issues we had to put some configuration to limit Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacity.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <[hidden email]> wrote:
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?







--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Lopez, Javier
Hi Piotr,

Sorry for the late response, I'm out of the office and with limited access to the Internet. I think we are on the right path to solve this problem. Some time ago we did a memory analysis over 3 different cluster we are using, two of them are running jobs 24/7 and the other is the one deploying thousands of jobs. All of those clusters have the same behavior for arrays of Chars and Bytes (as expected), but for this particular Class "java.lang.Class" the clusters that have 24/7 jobs have less than 20K instances of that class, whereas the other cluster has  383,120 
instances. I don't know if this could be related.

I hope that we can test this soon, and will let you know if this fixed the problem.

Thanks.


On 15 November 2017 at 13:18, Piotr Nowojski <[hidden email]> wrote:
Hi,

I have been able to observe some off heap memory “issues” by submitting Kafka job provided by Javier Lopez (in different mailing thread). 

TL;DR;

There was no memory leak, just memory pool “Metaspace” and “Compressed Class Space” are growing in size over time and are only rarely garbage collected. In my test case they together were wasting up to ~7GB of memory, while my test case could use as little as ~100MB. Connect with for example jconsole to your JVM, check their size and cut their size by half by setting:

env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M

In flink-conf.yaml. Everything works fine and memory consumption still too high? Rinse and repeat.


Long story:

In default settings, with max heap size of 1GB, off heap memory consumption, memory consumption off non-heap memory pools of “Metaspace” and “Compressed Class Space” was growing in time which seemed like indefinitely, and Metaspace was always around ~6 times larger compared to compressed class space. Default max meatspace size is unlimited, while “Compressed class space” has a default max size of 1GB. 

When I decreased the CompressedClassSpaceSize down to 100MB, memory consumption grew up to 90MB and then it started bouncing up and down by couple of MB. “Metaspace” was following the same pattern, but using ~600MB. When I decreased down MaxMetaspaceSize to 200MB, memory consumption of both pools was bouncing around ~220MB.

It seems like there are no general guide lines how to configure those values, since it’s heavily application dependent. However this seems like the most likely suspect of the apparent OFF HEAP “memory leak” that was reported couple of times in use cases where users are submitting hundreds/thousands of jobs to Flink cluster. For more information please check here:


Please let us know if this solves your issues.

Thanks, Piotrek

On 13 Nov 2017, at 16:06, Flavio Pompermaier <[hidden email]> wrote:

Unfortunately the issue I've opened [1] was not a problem of Flink but was just caused by an ever increasing job plan.
So no help from that..Let's hope to find out the real source of the problem.
Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it yet)

Best,

On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong <[hidden email]> wrote:

Hi,

We saw a similar issue in one of our job due to ByteBuffer memory leak[1].

We fixed it using the solution in the article, setting -Djdk.nio.maxCachedBufferSize

This variable is available for Java > 8u102

Best regards,

Kien

[1]http://www.evanjones.ca/java-bytebuffer-leak.html


On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
We also faced the same problem, but the number of jobs we can run before restarting the cluster depends on the volume of the data to shuffle around the network. We even had problems with a single job and in order to avoid OOM issues we had to put some configuration to limit Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacity.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <[hidden email]> wrote:
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?







--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20041809" value="+390461041809" target="_blank">+(39) 0461 041809


Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Lopez, Javier
Hi Piotr,

We found out which one was the problem in the workers. After setting a value for XX:MaxMetaspaceSize we started to get OOM exceptions from the metaspace. We found out how Flink manages the User classes here https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html and solved the problem by adding the job's jar file in the /lib of the nodes (master and workers). Now we have a constant memory usage in the workers. 

Unfortunately, we still have an OOM problem in the master node. We are using the same configuration as in the workers (200MB for MaxMetaspace and 13000MB for Heap) and after ~6000 jobs, the master runs out of memory. The metaspace usage is almost constant, around 50MB and the heap usage grows up to 10000MB, then GC does its work and reduces this usage. But we still have the OOM problems. Do you have any other idea of what could cause this problem? Our workaround is to restart the master, but we cannot keep doing this in the long term.

Thanks for all your support, it has been helpful.

On 16 November 2017 at 15:27, Javier Lopez <[hidden email]> wrote:
Hi Piotr,

Sorry for the late response, I'm out of the office and with limited access to the Internet. I think we are on the right path to solve this problem. Some time ago we did a memory analysis over 3 different cluster we are using, two of them are running jobs 24/7 and the other is the one deploying thousands of jobs. All of those clusters have the same behavior for arrays of Chars and Bytes (as expected), but for this particular Class "java.lang.Class" the clusters that have 24/7 jobs have less than 20K instances of that class, whereas the other cluster has  383,120 
instances. I don't know if this could be related.

I hope that we can test this soon, and will let you know if this fixed the problem.

Thanks.


On 15 November 2017 at 13:18, Piotr Nowojski <[hidden email]> wrote:
Hi,

I have been able to observe some off heap memory “issues” by submitting Kafka job provided by Javier Lopez (in different mailing thread). 

TL;DR;

There was no memory leak, just memory pool “Metaspace” and “Compressed Class Space” are growing in size over time and are only rarely garbage collected. In my test case they together were wasting up to ~7GB of memory, while my test case could use as little as ~100MB. Connect with for example jconsole to your JVM, check their size and cut their size by half by setting:

env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M

In flink-conf.yaml. Everything works fine and memory consumption still too high? Rinse and repeat.


Long story:

In default settings, with max heap size of 1GB, off heap memory consumption, memory consumption off non-heap memory pools of “Metaspace” and “Compressed Class Space” was growing in time which seemed like indefinitely, and Metaspace was always around ~6 times larger compared to compressed class space. Default max meatspace size is unlimited, while “Compressed class space” has a default max size of 1GB. 

When I decreased the CompressedClassSpaceSize down to 100MB, memory consumption grew up to 90MB and then it started bouncing up and down by couple of MB. “Metaspace” was following the same pattern, but using ~600MB. When I decreased down MaxMetaspaceSize to 200MB, memory consumption of both pools was bouncing around ~220MB.

It seems like there are no general guide lines how to configure those values, since it’s heavily application dependent. However this seems like the most likely suspect of the apparent OFF HEAP “memory leak” that was reported couple of times in use cases where users are submitting hundreds/thousands of jobs to Flink cluster. For more information please check here:


Please let us know if this solves your issues.

Thanks, Piotrek

On 13 Nov 2017, at 16:06, Flavio Pompermaier <[hidden email]> wrote:

Unfortunately the issue I've opened [1] was not a problem of Flink but was just caused by an ever increasing job plan.
So no help from that..Let's hope to find out the real source of the problem.
Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it yet)

Best,

On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong <[hidden email]> wrote:

Hi,

We saw a similar issue in one of our job due to ByteBuffer memory leak[1].

We fixed it using the solution in the article, setting -Djdk.nio.maxCachedBufferSize

This variable is available for Java > 8u102

Best regards,

Kien

[1]http://www.evanjones.ca/java-bytebuffer-leak.html


On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
We also faced the same problem, but the number of jobs we can run before restarting the cluster depends on the volume of the data to shuffle around the network. We even had problems with a single job and in order to avoid OOM issues we had to put some configuration to limit Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacity.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <[hidden email]> wrote:
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?







--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20041809" value="+390461041809" target="_blank">+(39) 0461 041809



Reply | Threaded
Open this post in threaded view
|

Re: Off heap memory issue

Piotr Nowojski
Hi,

OOMs from metaspace probably mean that your jars are not releasing some resources:

Regarding second issue (I guess it is probably somehow related to the first one). If it’s indeed a heap space OOM, it should be fairly easy to analyse/debug. This article describes how to track such issues, Especially chapter titled "Using Java VisualVM”:
It should allow you to pinpoint the owner and the source of the leak.

Piotrek

On 12 Dec 2017, at 14:47, Javier Lopez <[hidden email]> wrote:

Hi Piotr,

We found out which one was the problem in the workers. After setting a value for XX:MaxMetaspaceSize we started to get OOM exceptions from the metaspace. We found out how Flink manages the User classes here https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html and solved the problem by adding the job's jar file in the /lib of the nodes (master and workers). Now we have a constant memory usage in the workers. 

Unfortunately, we still have an OOM problem in the master node. We are using the same configuration as in the workers (200MB for MaxMetaspace and 13000MB for Heap) and after ~6000 jobs, the master runs out of memory. The metaspace usage is almost constant, around 50MB and the heap usage grows up to 10000MB, then GC does its work and reduces this usage. But we still have the OOM problems. Do you have any other idea of what could cause this problem? Our workaround is to restart the master, but we cannot keep doing this in the long term.

Thanks for all your support, it has been helpful.

On 16 November 2017 at 15:27, Javier Lopez <[hidden email]> wrote:
Hi Piotr,

Sorry for the late response, I'm out of the office and with limited access to the Internet. I think we are on the right path to solve this problem. Some time ago we did a memory analysis over 3 different cluster we are using, two of them are running jobs 24/7 and the other is the one deploying thousands of jobs. All of those clusters have the same behavior for arrays of Chars and Bytes (as expected), but for this particular Class "java.lang.Class" the clusters that have 24/7 jobs have less than 20K instances of that class, whereas the other cluster has  383,120 
instances. I don't know if this could be related.

I hope that we can test this soon, and will let you know if this fixed the problem.

Thanks.


On 15 November 2017 at 13:18, Piotr Nowojski <[hidden email]> wrote:
Hi,

I have been able to observe some off heap memory “issues” by submitting Kafka job provided by Javier Lopez (in different mailing thread). 

TL;DR;

There was no memory leak, just memory pool “Metaspace” and “Compressed Class Space” are growing in size over time and are only rarely garbage collected. In my test case they together were wasting up to ~7GB of memory, while my test case could use as little as ~100MB. Connect with for example jconsole to your JVM, check their size and cut their size by half by setting:

env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M

In flink-conf.yaml. Everything works fine and memory consumption still too high? Rinse and repeat.


Long story:

In default settings, with max heap size of 1GB, off heap memory consumption, memory consumption off non-heap memory pools of “Metaspace” and “Compressed Class Space” was growing in time which seemed like indefinitely, and Metaspace was always around ~6 times larger compared to compressed class space. Default max meatspace size is unlimited, while “Compressed class space” has a default max size of 1GB. 

When I decreased the CompressedClassSpaceSize down to 100MB, memory consumption grew up to 90MB and then it started bouncing up and down by couple of MB. “Metaspace” was following the same pattern, but using ~600MB. When I decreased down MaxMetaspaceSize to 200MB, memory consumption of both pools was bouncing around ~220MB.

It seems like there are no general guide lines how to configure those values, since it’s heavily application dependent. However this seems like the most likely suspect of the apparent OFF HEAP “memory leak” that was reported couple of times in use cases where users are submitting hundreds/thousands of jobs to Flink cluster. For more information please check here:


Please let us know if this solves your issues.

Thanks, Piotrek

On 13 Nov 2017, at 16:06, Flavio Pompermaier <[hidden email]> wrote:

Unfortunately the issue I've opened [1] was not a problem of Flink but was just caused by an ever increasing job plan.
So no help from that..Let's hope to find out the real source of the problem.
Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it yet)

Best,

On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong <[hidden email]> wrote:

Hi,

We saw a similar issue in one of our job due to ByteBuffer memory leak[1].

We fixed it using the solution in the article, setting -Djdk.nio.maxCachedBufferSize

This variable is available for Java > 8u102

Best regards,

Kien

[1]http://www.evanjones.ca/java-bytebuffer-leak.html


On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
We also faced the same problem, but the number of jobs we can run before restarting the cluster depends on the volume of the data to shuffle around the network. We even had problems with a single job and in order to avoid OOM issues we had to put some configuration to limit Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacity.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <[hidden email]> wrote:
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the problem was in our custom sources/sinks. We figured out that none of our custom components is causing this problem. We came up with a small test, and realized that the Flink nodes run out of non-heap JVM memory and crash after deployment of thousands of jobs. 

When rapidly deploying thousands or hundreds of thousands of Flink jobs - depending on job complexity in terms of resource consumption - Flink nodes non-heap JVM memory consumption grows until there is no more memory left on the machine and the Flink process crashes. Both TaskManagers and JobManager exhibit the same behavior. The TaskManagers die faster though. The memory consumption doesn't decrease after stopping the deployment of new jobs, with the cluster being idle (no running jobs). 

We could replicate the behavior by the rapid deployment of the WordCount Job provided in the Quickstart with a Python script.  We started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs, i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited after deployment 15 seconds for the 10K events to be read from Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We have a workaround for this, which restarts the Flink nodes once a memory threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger <[hidden email]> wrote:
I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern in the allocated objects? Ideally none of the classes from your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <[hidden email]> wrote:
Hi Javier,

I'm not aware of such issues with Flink, but if you could give us some more details on your setup, I might get some more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <[hidden email]> wrote:
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started 200 or more jobs we see that the non-heap memory in the taskmanagers increases a lot, to the point of killing the instances. We found out that every time we start a new job, the committed non-heap memory increases by 5 to 10MB. Is this an expected behavior? Are there ways to prevent this?







--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20041809" value="+390461041809" target="_blank" class="">+(39) 0461 041809