Memory Issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Memory Issue

Govindarajan Srinivasaraghavan
Hi,

I have a pipeline running on flink which ingests around 6k messages per second. Each message is around 1kb and it passes through various stages like filter, 5 sec tumbling window per key etc.. and finally flatmap to computation before sending it to kafka sink. The data is first ingested as protocol buffers and then in subsequent operators they are converted into POJO's.

There are lots objects created inside the user functions and some of them are cached as well. I have been running this pipeline on 48 task slots across 3 task manages with each one allocated with 22GB memory.

The issue I'm having is within a period of 10 hours, almost 19k young generation GC have been run which is roughly every 2 seconds and GC time taken value is more than 2 million. I have also enabled object reuse. Any suggestions on how this issue could be resolved? Thanks.

Regards,
Govind

Reply | Threaded
Open this post in threaded view
|

Re: Memory Issue

Jörn Franke
One would need to look at your code and possible on some heap statistics. Maybe something wrong happens when you cache them (do you use a 3rd party library or your own implementation?). Do you use a stable version of your protobuf library (not necessarily the most recent). You also may want to look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).

Probably you are creating a lot of objects due to conversion into PoJo. You could increase the heap for the Java objects of the young generation.
You can also switch to the G1-Garbage collector (if Jdk 8) or at least the parallel one.
Generally you should avoid creating PoJo/objects as much as possible in a long running Streaming job.



> On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <[hidden email]> wrote:
>
> Hi,
>
> I have a pipeline running on flink which ingests around 6k messages per second. Each message is around 1kb and it passes through various stages like filter, 5 sec tumbling window per key etc.. and finally flatmap to computation before sending it to kafka sink. The data is first ingested as protocol buffers and then in subsequent operators they are converted into POJO's.
>
> There are lots objects created inside the user functions and some of them are cached as well. I have been running this pipeline on 48 task slots across 3 task manages with each one allocated with 22GB memory.
>
> The issue I'm having is within a period of 10 hours, almost 19k young generation GC have been run which is roughly every 2 seconds and GC time taken value is more than 2 million. I have also enabled object reuse. Any suggestions on how this issue could be resolved? Thanks.
>
> Regards,
> Govind
>
Reply | Threaded
Open this post in threaded view
|

Re: Memory Issue

Govindarajan Srinivasaraghavan
I have couple more questions regarding flink's jvm memory.

In a streaming application what is managed memory used for? I read from a blog that all objects created inside the user function will go into unmanaged memory. Where does the managed key/ operator state state reside?

Also when does the state gets persisted into rocksdb, is it only when checkpointing is enabled? If the state backend is rocksdb but the checkpointing is not enabled what will happen?

Thanks.

On Sun, Aug 20, 2017 at 11:14 PM, Jörn Franke <[hidden email]> wrote:
One would need to look at your code and possible on some heap statistics. Maybe something wrong happens when you cache them (do you use a 3rd party library or your own implementation?). Do you use a stable version of your protobuf library (not necessarily the most recent). You also may want to look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).

Probably you are creating a lot of objects due to conversion into PoJo. You could increase the heap for the Java objects of the young generation.
You can also switch to the G1-Garbage collector (if Jdk 8) or at least the parallel one.
Generally you should avoid creating PoJo/objects as much as possible in a long running Streaming job.



> On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <[hidden email]> wrote:
>
> Hi,
>
> I have a pipeline running on flink which ingests around 6k messages per second. Each message is around 1kb and it passes through various stages like filter, 5 sec tumbling window per key etc.. and finally flatmap to computation before sending it to kafka sink. The data is first ingested as protocol buffers and then in subsequent operators they are converted into POJO's.
>
> There are lots objects created inside the user functions and some of them are cached as well. I have been running this pipeline on 48 task slots across 3 task manages with each one allocated with 22GB memory.
>
> The issue I'm having is within a period of 10 hours, almost 19k young generation GC have been run which is roughly every 2 seconds and GC time taken value is more than 2 million. I have also enabled object reuse. Any suggestions on how this issue could be resolved? Thanks.
>
> Regards,
> Govind
>

Reply | Threaded
Open this post in threaded view
|

Re: Memory Issue

Stephan Ewen
Hi!

RocksDB will be used when it is selected as the state backend, independent of the checkpointing configuration.

Using RocksDB as the state backend, Flink will have some objects on the heap, like timers (we will move them to RocksDB as well in the near future) but the majority will be off heap.

Stephan


On Thu, Aug 24, 2017 at 5:28 AM, Govindarajan Srinivasaraghavan <[hidden email]> wrote:
I have couple more questions regarding flink's jvm memory.

In a streaming application what is managed memory used for? I read from a blog that all objects created inside the user function will go into unmanaged memory. Where does the managed key/ operator state state reside?

Also when does the state gets persisted into rocksdb, is it only when checkpointing is enabled? If the state backend is rocksdb but the checkpointing is not enabled what will happen?

Thanks.

On Sun, Aug 20, 2017 at 11:14 PM, Jörn Franke <[hidden email]> wrote:
One would need to look at your code and possible on some heap statistics. Maybe something wrong happens when you cache them (do you use a 3rd party library or your own implementation?). Do you use a stable version of your protobuf library (not necessarily the most recent). You also may want to look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).

Probably you are creating a lot of objects due to conversion into PoJo. You could increase the heap for the Java objects of the young generation.
You can also switch to the G1-Garbage collector (if Jdk 8) or at least the parallel one.
Generally you should avoid creating PoJo/objects as much as possible in a long running Streaming job.



> On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <[hidden email]> wrote:
>
> Hi,
>
> I have a pipeline running on flink which ingests around 6k messages per second. Each message is around 1kb and it passes through various stages like filter, 5 sec tumbling window per key etc.. and finally flatmap to computation before sending it to kafka sink. The data is first ingested as protocol buffers and then in subsequent operators they are converted into POJO's.
>
> There are lots objects created inside the user functions and some of them are cached as well. I have been running this pipeline on 48 task slots across 3 task manages with each one allocated with 22GB memory.
>
> The issue I'm having is within a period of 10 hours, almost 19k young generation GC have been run which is roughly every 2 seconds and GC time taken value is more than 2 million. I have also enabled object reuse. Any suggestions on how this issue could be resolved? Thanks.
>
> Regards,
> Govind
>


Reply | Threaded
Open this post in threaded view
|

Re: Memory Issue

Govindarajan Srinivasaraghavan
Thanks Stephan, any pointers on how managed memory is used in streaming application will really help.

Regards,
Govind

On Aug 24, 2017, at 1:53 AM, Stephan Ewen <[hidden email]> wrote:

Hi!

RocksDB will be used when it is selected as the state backend, independent of the checkpointing configuration.

Using RocksDB as the state backend, Flink will have some objects on the heap, like timers (we will move them to RocksDB as well in the near future) but the majority will be off heap.

Stephan


On Thu, Aug 24, 2017 at 5:28 AM, Govindarajan Srinivasaraghavan <[hidden email]> wrote:
I have couple more questions regarding flink's jvm memory.

In a streaming application what is managed memory used for? I read from a blog that all objects created inside the user function will go into unmanaged memory. Where does the managed key/ operator state state reside?

Also when does the state gets persisted into rocksdb, is it only when checkpointing is enabled? If the state backend is rocksdb but the checkpointing is not enabled what will happen?

Thanks.

On Sun, Aug 20, 2017 at 11:14 PM, Jörn Franke <[hidden email]> wrote:
One would need to look at your code and possible on some heap statistics. Maybe something wrong happens when you cache them (do you use a 3rd party library or your own implementation?). Do you use a stable version of your protobuf library (not necessarily the most recent). You also may want to look at buffers to avoid creating objects (bytebuffer, stringbuffer etc).

Probably you are creating a lot of objects due to conversion into PoJo. You could increase the heap for the Java objects of the young generation.
You can also switch to the G1-Garbage collector (if Jdk 8) or at least the parallel one.
Generally you should avoid creating PoJo/objects as much as possible in a long running Streaming job.



> On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan <[hidden email]> wrote:
>
> Hi,
>
> I have a pipeline running on flink which ingests around 6k messages per second. Each message is around 1kb and it passes through various stages like filter, 5 sec tumbling window per key etc.. and finally flatmap to computation before sending it to kafka sink. The data is first ingested as protocol buffers and then in subsequent operators they are converted into POJO's.
>
> There are lots objects created inside the user functions and some of them are cached as well. I have been running this pipeline on 48 task slots across 3 task manages with each one allocated with 22GB memory.
>
> The issue I'm having is within a period of 10 hours, almost 19k young generation GC have been run which is roughly every 2 seconds and GC time taken value is more than 2 million. I have also enabled object reuse. Any suggestions on how this issue could be resolved? Thanks.
>
> Regards,
> Govind
>