Flink + Marathon (Mesos) Memory Issues

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink + Marathon (Mesos) Memory Issues

ani.desh1512
*Background*: We have a setup of Flink 1.4.0. We run this flink
cluster via /flink-jobmanager.sh foreground/ and /flink-taskmanager.sh
foreground/ command via Marathon (which launches them as mesos jobs). So,
basically, jobmanager and taskmanagers run as mesos tasks.


Now, say, we run the flink taskmanagers with taskmanager.heap.mb set to 7G
in flink-conf.yaml and Marathon memory is set to 18G. Even after this, we
frequently see the taskmanager containers getting killed because of OOM. The
flink streaming job that we run is a basic job without any windowing or
other stateful operations. Its just a job that reads from a stream, applies
a bunch of transformations and writes it back via BucketingSink. It uses
RocksDB as state backend.

So what i am trying to understand is, how is Flink allocating taskmanager
memory in containers? What would be a safe value for us to set as Marathon
memory so that our taskmanagers dont keep getting killed? Are we seeing this
behaviour because of starting flink taskmanagers in foreground mode as mesos
task?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink + Marathon (Mesos) Memory Issues

Stefan Richter
Hi,

besides your configured heap size, there is also some off-heap memory used in the JVM process, in particular by RocksDB. Each keyed operator instance on a TM has its own RocksDB instance, so the question is how many are running in one container and what is their configuration? For RocksDB for example write_buffer_size (32MB default), write_buffer_count (3 by default) and block_cache_size (16 MB default)  contribute per instance. For more details, please have a look here: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB. You might need adjust your RocksDB configuration and/or plan your container memory limits accordingly to be on the safe side.

Best,
Stefan

Am 03.05.2018 um 21:59 schrieb ani.desh1512 <[hidden email]>:

*Background*: We have a setup of Flink 1.4.0. We run this flink
cluster via /flink-jobmanager.sh foreground/ and /flink-taskmanager.sh
foreground/ command via Marathon (which launches them as mesos jobs). So,
basically, jobmanager and taskmanagers run as mesos tasks.


Now, say, we run the flink taskmanagers with taskmanager.heap.mb set to 7G
in flink-conf.yaml and Marathon memory is set to 18G. Even after this, we
frequently see the taskmanager containers getting killed because of OOM. The
flink streaming job that we run is a basic job without any windowing or
other stateful operations. Its just a job that reads from a stream, applies
a bunch of transformations and writes it back via BucketingSink. It uses
RocksDB as state backend.

So what i am trying to understand is, how is Flink allocating taskmanager
memory in containers? What would be a safe value for us to set as Marathon
memory so that our taskmanagers dont keep getting killed? Are we seeing this
behaviour because of starting flink taskmanagers in foreground mode as mesos
task?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink + Marathon (Mesos) Memory Issues

hao gao
Hi,

Since you said BucketingSink, I think it may be related to your bucketer. Let's say you bucket by hour.  In your stream, at a moment, your records' timestamp ranges from hour 00 to hour 23. Which means in your task, it needs 24 writers dedicated to each bucket. If you have 4 task slots in a taskmanager, then there are 24 * 4 writers at the same time. If your writer is parquet writer, overall they may need lots of memory.
Just my guess

2018-05-04 2:31 GMT-07:00 Stefan Richter <[hidden email]>:
Hi,

besides your configured heap size, there is also some off-heap memory used in the JVM process, in particular by RocksDB. Each keyed operator instance on a TM has its own RocksDB instance, so the question is how many are running in one container and what is their configuration? For RocksDB for example write_buffer_size (32MB default), write_buffer_count (3 by default) and block_cache_size (16 MB default)  contribute per instance. For more details, please have a look here: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB. You might need adjust your RocksDB configuration and/or plan your container memory limits accordingly to be on the safe side.

Best,
Stefan

Am 03.05.2018 um 21:59 schrieb ani.desh1512 <[hidden email]>:

*Background*: We have a setup of Flink 1.4.0. We run this flink
cluster via /flink-jobmanager.sh foreground/ and /flink-taskmanager.sh
foreground/ command via Marathon (which launches them as mesos jobs). So,
basically, jobmanager and taskmanagers run as mesos tasks.


Now, say, we run the flink taskmanagers with taskmanager.heap.mb set to 7G
in flink-conf.yaml and Marathon memory is set to 18G. Even after this, we
frequently see the taskmanager containers getting killed because of OOM. The
flink streaming job that we run is a basic job without any windowing or
other stateful operations. Its just a job that reads from a stream, applies
a bunch of transformations and writes it back via BucketingSink. It uses
RocksDB as state backend.

So what i am trying to understand is, how is Flink allocating taskmanager
memory in containers? What would be a safe value for us to set as Marathon
memory so that our taskmanagers dont keep getting killed? Are we seeing this
behaviour because of starting flink taskmanagers in foreground mode as mesos
task?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




--
Thanks
 - Hao