Memory constrains running Flink on Kubernetes

classic Classic list List threaded Threaded
12 messages Options
wvl
Reply | Threaded
Open this post in threaded view
|

Memory constrains running Flink on Kubernetes

wvl
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

Xintong Song
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

Yun Tang
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

Yang Wang

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  • jvm heap, limited by -Xmx
  • jvm non-heap, limited by -XX:MaxMetaspaceSize
  • jvm direct memory, limited by -XX:MaxDirectMemorySize
  • native memory, used by rocksdb, just as Yun Tang said, could be limited by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap memory or native memory. I suggest you add an environment FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. And then only the native memory could cause OOM. Leave enough memory for rocksdb, and then hope your job could run smoothly.


Yun Tang <[hidden email]> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

wvl
Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

wvl
Thanks for all the answers so far.

Especially clarifying was that RocksDB memory usage isn't accounted for in the flink memory metrics. It's clear that we need to experiment to understand it's memory usage and knowing that we should be looking at the container memory usage minus all the jvm managed memory, helps.

In mean while, we've set MaxMetaspaceSize to 200M based on our metrics. Sadly the resulting OOM does not result a better behaved job, because it would seem that the (taskmanager) JVM itself is not restarted - which makes sense in a multijob environment.
So we're looking into ways to simply prevent this metaspace growth (job library jars in /lib on TM).

Going back to RocksDB, the given formula "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory." isn't completely clear to me.

Block Cache: "Out of box, RocksDB will use LRU-based block cache implementation with 8MB capacity"
Index & Filter Cache: "By default index and filter blocks are cached outside of block cache, and users won't be able to control how much memory should be use to cache these blocks, other than setting max_open_files.". The default settings doesn't set max_open_files and the rocksdb default seems to be 1000 (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89) .. not completely sure about this.
Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x your worst case memory use."

May I presume a unique ValueStateDescriptor equals a Column Family?
If so, say I have 10 of those.
8MB + (10 * 64 * 2) + $Index&FilterBlocks

So is that correct and how would one calculate $Index&FilterBlocks? The docs suggest a relationship between max_open_files (1000) and the amount index/filter of blocks that can be cached, but is this a 1 to 1 relationship? Anyway, this concept of blocks is very unclear.

> Have you ever set the memory limit of your taskmanager pod when launching it in k8s?

Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and 1 slot and were looking into downsizing a bit to improve our pod to VM ratio.

On Wed, Jul 24, 2019 at 11:07 AM Yang Wang <[hidden email]> wrote:

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  • jvm heap, limited by -Xmx
  • jvm non-heap, limited by -XX:MaxMetaspaceSize
  • jvm direct memory, limited by -XX:MaxDirectMemorySize
  • native memory, used by rocksdb, just as Yun Tang said, could be limited by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap memory or native memory. I suggest you add an environment FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. And then only the native memory could cause OOM. Leave enough memory for rocksdb, and then hope your job could run smoothly.


Yun Tang <[hidden email]> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

Yun Tang
Hi

It's definitely not easy to calculate the accurate memory usage of RocksDB, but formula of "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory"  should give enough sophisticated hints.
When talking about the column-family-number, they are equals to the number of your states which are the declared state descriptors in one operator and potential one window state (if you're using window).
The default writer-buffer-number is 2 at most for each column family, and the default write-buffer-memory size is 4MB. Pay attention that if you ever configure the options for RocksDB, these memory usage would differ from default values.
The last part of index&filter memory is not easy to estimate, but from my experience this part of memory would not occupy too much only if you have many open files.

Last but not least, Flink would enable slot sharing by default, and even if you only one slot per taskmanager, there might exists many RocksDB within that TM due to many operator with keyed state running.

Apart from the theoretical analysis, you'd better to open RocksDB native metrics or track the memory usage of pods through Prometheus with k8s. 

Best
Yun Tang

From: wvl <[hidden email]>
Sent: Thursday, July 25, 2019 17:50
To: Yang Wang <[hidden email]>
Cc: Yun Tang <[hidden email]>; Xintong Song <[hidden email]>; user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Thanks for all the answers so far.

Especially clarifying was that RocksDB memory usage isn't accounted for in the flink memory metrics. It's clear that we need to experiment to understand it's memory usage and knowing that we should be looking at the container memory usage minus all the jvm managed memory, helps.

In mean while, we've set MaxMetaspaceSize to 200M based on our metrics. Sadly the resulting OOM does not result a better behaved job, because it would seem that the (taskmanager) JVM itself is not restarted - which makes sense in a multijob environment.
So we're looking into ways to simply prevent this metaspace growth (job library jars in /lib on TM).

Going back to RocksDB, the given formula "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory." isn't completely clear to me.

Block Cache: "Out of box, RocksDB will use LRU-based block cache implementation with 8MB capacity"
Index & Filter Cache: "By default index and filter blocks are cached outside of block cache, and users won't be able to control how much memory should be use to cache these blocks, other than setting max_open_files.". The default settings doesn't set max_open_files and the rocksdb default seems to be 1000 (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89) .. not completely sure about this.
Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x your worst case memory use."

May I presume a unique ValueStateDescriptor equals a Column Family?
If so, say I have 10 of those.
8MB + (10 * 64 * 2) + $Index&FilterBlocks

So is that correct and how would one calculate $Index&FilterBlocks? The docs suggest a relationship between max_open_files (1000) and the amount index/filter of blocks that can be cached, but is this a 1 to 1 relationship? Anyway, this concept of blocks is very unclear.

> Have you ever set the memory limit of your taskmanager pod when launching it in k8s?

Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and 1 slot and were looking into downsizing a bit to improve our pod to VM ratio.

On Wed, Jul 24, 2019 at 11:07 AM Yang Wang <[hidden email]> wrote:

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  • jvm heap, limited by -Xmx
  • jvm non-heap, limited by -XX:MaxMetaspaceSize
  • jvm direct memory, limited by -XX:MaxDirectMemorySize
  • native memory, used by rocksdb, just as Yun Tang said, could be limited by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap memory or native memory. I suggest you add an environment FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. And then only the native memory could cause OOM. Leave enough memory for rocksdb, and then hope your job could run smoothly.


Yun Tang <[hidden email]> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

Yu Li
For the memory usage of RocksDB, there's already some discussion in FLINK-7289 and a good suggestion from Mike to use the WriteBufferManager to limit the total memory usage, FYI.

We will drive to make the memory management of state backends more "hands free" in latter release (probably in release 1.10) and please watch the release plan and/or the weekly community update [1] threads.

[1] https://s.apache.org/ix7iv

Best Regards,
Yu


On Thu, 25 Jul 2019 at 15:12, Yun Tang <[hidden email]> wrote:
Hi

It's definitely not easy to calculate the accurate memory usage of RocksDB, but formula of "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory"  should give enough sophisticated hints.
When talking about the column-family-number, they are equals to the number of your states which are the declared state descriptors in one operator and potential one window state (if you're using window).
The default writer-buffer-number is 2 at most for each column family, and the default write-buffer-memory size is 4MB. Pay attention that if you ever configure the options for RocksDB, these memory usage would differ from default values.
The last part of index&filter memory is not easy to estimate, but from my experience this part of memory would not occupy too much only if you have many open files.

Last but not least, Flink would enable slot sharing by default, and even if you only one slot per taskmanager, there might exists many RocksDB within that TM due to many operator with keyed state running.

Apart from the theoretical analysis, you'd better to open RocksDB native metrics or track the memory usage of pods through Prometheus with k8s. 

Best
Yun Tang

From: wvl <[hidden email]>
Sent: Thursday, July 25, 2019 17:50
To: Yang Wang <[hidden email]>
Cc: Yun Tang <[hidden email]>; Xintong Song <[hidden email]>; user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Thanks for all the answers so far.

Especially clarifying was that RocksDB memory usage isn't accounted for in the flink memory metrics. It's clear that we need to experiment to understand it's memory usage and knowing that we should be looking at the container memory usage minus all the jvm managed memory, helps.

In mean while, we've set MaxMetaspaceSize to 200M based on our metrics. Sadly the resulting OOM does not result a better behaved job, because it would seem that the (taskmanager) JVM itself is not restarted - which makes sense in a multijob environment.
So we're looking into ways to simply prevent this metaspace growth (job library jars in /lib on TM).

Going back to RocksDB, the given formula "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory." isn't completely clear to me.

Block Cache: "Out of box, RocksDB will use LRU-based block cache implementation with 8MB capacity"
Index & Filter Cache: "By default index and filter blocks are cached outside of block cache, and users won't be able to control how much memory should be use to cache these blocks, other than setting max_open_files.". The default settings doesn't set max_open_files and the rocksdb default seems to be 1000 (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89) .. not completely sure about this.
Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x your worst case memory use."

May I presume a unique ValueStateDescriptor equals a Column Family?
If so, say I have 10 of those.
8MB + (10 * 64 * 2) + $Index&FilterBlocks

So is that correct and how would one calculate $Index&FilterBlocks? The docs suggest a relationship between max_open_files (1000) and the amount index/filter of blocks that can be cached, but is this a 1 to 1 relationship? Anyway, this concept of blocks is very unclear.

> Have you ever set the memory limit of your taskmanager pod when launching it in k8s?

Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and 1 slot and were looking into downsizing a bit to improve our pod to VM ratio.

On Wed, Jul 24, 2019 at 11:07 AM Yang Wang <[hidden email]> wrote:

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  • jvm heap, limited by -Xmx
  • jvm non-heap, limited by -XX:MaxMetaspaceSize
  • jvm direct memory, limited by -XX:MaxDirectMemorySize
  • native memory, used by rocksdb, just as Yun Tang said, could be limited by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap memory or native memory. I suggest you add an environment FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. And then only the native memory could cause OOM. Leave enough memory for rocksdb, and then hope your job could run smoothly.


Yun Tang <[hidden email]> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

wvl
Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

wvl
Excellent. Thanks for all the answers so far.

So there was another issue I mentioned which we made some progress gaining insight into, namely our metaspace growth when faced with job restarts.

We can easily hit 1Gb metaspace usage within 15 minutes if we restart often.
We attempted to troubleshoot this issue by looking at all the classes in metaspace using `jcmd <pid> GC.class_stats`.

Here we observed that after every job restart another entry is created for every class in our job. Where the old classes have InstBytes=0. So far so good, but moving to the Total column for these entries show that memory is still being used.
Also, adding up all entries in the Total column indeed corresponds to our metaspace usage. So far we could only conclude that our job classes - none of them - were being unloaded.

Then we stumbled upon this ticket. Now here are our results running the SocketWindowWordCount jar in a flink 1.8.0 cluster with one taskmanager.

We achieve a class count by doing a jcmd 3052 GC.class_stats | grep -i org.apache.flink.streaming.examples.windowing.SessionWindowing | wc -l

First run:
  Class Count: 1
  Metaspace: 30695K

After 800~ runs:
  Class Count: 802
  Metaspace: 39406K


Interesting when we looked a bit later the class count slowly went down, slowly, step by step, where just to be sure we used `jcmd <pid> GC.run` to force GC every 30s or so. If I had to guess it took about 20 minutes to go from 800~ to 170~, with metaspace dropping to 35358K. In a sense we've seen this behavior, but with much much larger increases in metaspace usage over far fewer job restarts.

I've added this information to https://issues.apache.org/jira/browse/FLINK-11205.

That said, I'd really like to confirm the following:
- classes should usually only appear once in GC.class_stats output
- flink / the jvm has very slow cleanup of the metaspace
- something clearly is leaking during restarts


On Mon, Jul 29, 2019 at 9:52 AM Yu Li <[hidden email]> wrote:
For the memory usage of RocksDB, there's already some discussion in FLINK-7289 and a good suggestion from Mike to use the WriteBufferManager to limit the total memory usage, FYI.

We will drive to make the memory management of state backends more "hands free" in latter release (probably in release 1.10) and please watch the release plan and/or the weekly community update [1] threads.

[1] https://s.apache.org/ix7iv

Best Regards,
Yu


On Thu, 25 Jul 2019 at 15:12, Yun Tang <[hidden email]> wrote:
Hi

It's definitely not easy to calculate the accurate memory usage of RocksDB, but formula of "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory"  should give enough sophisticated hints.
When talking about the column-family-number, they are equals to the number of your states which are the declared state descriptors in one operator and potential one window state (if you're using window).
The default writer-buffer-number is 2 at most for each column family, and the default write-buffer-memory size is 4MB. Pay attention that if you ever configure the options for RocksDB, these memory usage would differ from default values.
The last part of index&filter memory is not easy to estimate, but from my experience this part of memory would not occupy too much only if you have many open files.

Last but not least, Flink would enable slot sharing by default, and even if you only one slot per taskmanager, there might exists many RocksDB within that TM due to many operator with keyed state running.

Apart from the theoretical analysis, you'd better to open RocksDB native metrics or track the memory usage of pods through Prometheus with k8s. 

Best
Yun Tang

From: wvl <[hidden email]>
Sent: Thursday, July 25, 2019 17:50
To: Yang Wang <[hidden email]>
Cc: Yun Tang <[hidden email]>; Xintong Song <[hidden email]>; user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Thanks for all the answers so far.

Especially clarifying was that RocksDB memory usage isn't accounted for in the flink memory metrics. It's clear that we need to experiment to understand it's memory usage and knowing that we should be looking at the container memory usage minus all the jvm managed memory, helps.

In mean while, we've set MaxMetaspaceSize to 200M based on our metrics. Sadly the resulting OOM does not result a better behaved job, because it would seem that the (taskmanager) JVM itself is not restarted - which makes sense in a multijob environment.
So we're looking into ways to simply prevent this metaspace growth (job library jars in /lib on TM).

Going back to RocksDB, the given formula "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory." isn't completely clear to me.

Block Cache: "Out of box, RocksDB will use LRU-based block cache implementation with 8MB capacity"
Index & Filter Cache: "By default index and filter blocks are cached outside of block cache, and users won't be able to control how much memory should be use to cache these blocks, other than setting max_open_files.". The default settings doesn't set max_open_files and the rocksdb default seems to be 1000 (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89) .. not completely sure about this.
Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x your worst case memory use."

May I presume a unique ValueStateDescriptor equals a Column Family?
If so, say I have 10 of those.
8MB + (10 * 64 * 2) + $Index&FilterBlocks

So is that correct and how would one calculate $Index&FilterBlocks? The docs suggest a relationship between max_open_files (1000) and the amount index/filter of blocks that can be cached, but is this a 1 to 1 relationship? Anyway, this concept of blocks is very unclear.

> Have you ever set the memory limit of your taskmanager pod when launching it in k8s?

Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and 1 slot and were looking into downsizing a bit to improve our pod to VM ratio.

On Wed, Jul 24, 2019 at 11:07 AM Yang Wang <[hidden email]> wrote:

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  • jvm heap, limited by -Xmx
  • jvm non-heap, limited by -XX:MaxMetaspaceSize
  • jvm direct memory, limited by -XX:MaxDirectMemorySize
  • native memory, used by rocksdb, just as Yun Tang said, could be limited by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap memory or native memory. I suggest you add an environment FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. And then only the native memory could cause OOM. Leave enough memory for rocksdb, and then hope your job could run smoothly.


Yun Tang <[hidden email]> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

wvl
Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

wvl
Btw, with regard to:

> The default writer-buffer-number is 2 at most for each column family, and the default write-buffer-memory size is 4MB.

This isn't what I see when looking at the OPTIONS-XXXXXX file in the rocksdb directories in state:

[CFOptions "xxxxxx"]
  ttl=0
  report_bg_io_stats=false
  compaction_options_universal={allow_trivial_move=false;size_ratio=1;min_merge_width=2;max_size_amplification_percent=200;max_merge_width=4294967295;compression_size_percent=-1;stop_style=kCompactionStopStyleTotalSize;}
  table_factory=BlockBasedTable
  paranoid_file_checks=false
  compression_per_level=
  inplace_update_support=false
  soft_pending_compaction_bytes_limit=68719476736
  max_successive_merges=0
  max_write_buffer_number=2
  level_compaction_dynamic_level_bytes=false
  max_bytes_for_level_base=268435456
  optimize_filters_for_hits=false
  force_consistency_checks=false
  disable_auto_compactions=false
  max_compaction_bytes=1677721600
  hard_pending_compaction_bytes_limit=274877906944
  compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;ttl=0;}
  max_bytes_for_level_multiplier=10.000000
  level0_file_num_compaction_trigger=4
  level0_slowdown_writes_trigger=20
  compaction_pri=kByCompensatedSize
  compaction_filter=nullptr
  level0_stop_writes_trigger=36
  write_buffer_size=67108864
  min_write_buffer_number_to_merge=1
  num_levels=7
  target_file_size_multiplier=1
  arena_block_size=8388608
  memtable_huge_page_size=0
  bloom_locality=0
  inplace_update_num_locks=10000
  memtable_prefix_bloom_size_ratio=0.000000
  max_sequential_skip_in_iterations=8
  max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
  compression=kSnappyCompression
  max_write_buffer_number_to_maintain=0
  bottommost_compression=kDisableCompressionOption
  comparator=leveldb.BytewiseComparator
  prefix_extractor=nullptr
  target_file_size_base=67108864
  merge_operator=StringAppendTESTOperator
  memtable_insert_with_hint_prefix_extractor=nullptr
  memtable_factory=SkipListFactory
  compaction_filter_factory=nullptr
  compaction_style=kCompactionStyleLevel

Are these options somehow not applied or overridden?

On Mon, Jul 29, 2019 at 4:42 PM wvl <[hidden email]> wrote:
Excellent. Thanks for all the answers so far.

So there was another issue I mentioned which we made some progress gaining insight into, namely our metaspace growth when faced with job restarts.

We can easily hit 1Gb metaspace usage within 15 minutes if we restart often.
We attempted to troubleshoot this issue by looking at all the classes in metaspace using `jcmd <pid> GC.class_stats`.

Here we observed that after every job restart another entry is created for every class in our job. Where the old classes have InstBytes=0. So far so good, but moving to the Total column for these entries show that memory is still being used.
Also, adding up all entries in the Total column indeed corresponds to our metaspace usage. So far we could only conclude that our job classes - none of them - were being unloaded.

Then we stumbled upon this ticket. Now here are our results running the SocketWindowWordCount jar in a flink 1.8.0 cluster with one taskmanager.

We achieve a class count by doing a jcmd 3052 GC.class_stats | grep -i org.apache.flink.streaming.examples.windowing.SessionWindowing | wc -l

First run:
  Class Count: 1
  Metaspace: 30695K

After 800~ runs:
  Class Count: 802
  Metaspace: 39406K


Interesting when we looked a bit later the class count slowly went down, slowly, step by step, where just to be sure we used `jcmd <pid> GC.run` to force GC every 30s or so. If I had to guess it took about 20 minutes to go from 800~ to 170~, with metaspace dropping to 35358K. In a sense we've seen this behavior, but with much much larger increases in metaspace usage over far fewer job restarts.

I've added this information to https://issues.apache.org/jira/browse/FLINK-11205.

That said, I'd really like to confirm the following:
- classes should usually only appear once in GC.class_stats output
- flink / the jvm has very slow cleanup of the metaspace
- something clearly is leaking during restarts


On Mon, Jul 29, 2019 at 9:52 AM Yu Li <[hidden email]> wrote:
For the memory usage of RocksDB, there's already some discussion in FLINK-7289 and a good suggestion from Mike to use the WriteBufferManager to limit the total memory usage, FYI.

We will drive to make the memory management of state backends more "hands free" in latter release (probably in release 1.10) and please watch the release plan and/or the weekly community update [1] threads.

[1] https://s.apache.org/ix7iv

Best Regards,
Yu


On Thu, 25 Jul 2019 at 15:12, Yun Tang <[hidden email]> wrote:
Hi

It's definitely not easy to calculate the accurate memory usage of RocksDB, but formula of "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory"  should give enough sophisticated hints.
When talking about the column-family-number, they are equals to the number of your states which are the declared state descriptors in one operator and potential one window state (if you're using window).
The default writer-buffer-number is 2 at most for each column family, and the default write-buffer-memory size is 4MB. Pay attention that if you ever configure the options for RocksDB, these memory usage would differ from default values.
The last part of index&filter memory is not easy to estimate, but from my experience this part of memory would not occupy too much only if you have many open files.

Last but not least, Flink would enable slot sharing by default, and even if you only one slot per taskmanager, there might exists many RocksDB within that TM due to many operator with keyed state running.

Apart from the theoretical analysis, you'd better to open RocksDB native metrics or track the memory usage of pods through Prometheus with k8s. 

Best
Yun Tang

From: wvl <[hidden email]>
Sent: Thursday, July 25, 2019 17:50
To: Yang Wang <[hidden email]>
Cc: Yun Tang <[hidden email]>; Xintong Song <[hidden email]>; user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Thanks for all the answers so far.

Especially clarifying was that RocksDB memory usage isn't accounted for in the flink memory metrics. It's clear that we need to experiment to understand it's memory usage and knowing that we should be looking at the container memory usage minus all the jvm managed memory, helps.

In mean while, we've set MaxMetaspaceSize to 200M based on our metrics. Sadly the resulting OOM does not result a better behaved job, because it would seem that the (taskmanager) JVM itself is not restarted - which makes sense in a multijob environment.
So we're looking into ways to simply prevent this metaspace growth (job library jars in /lib on TM).

Going back to RocksDB, the given formula "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory." isn't completely clear to me.

Block Cache: "Out of box, RocksDB will use LRU-based block cache implementation with 8MB capacity"
Index & Filter Cache: "By default index and filter blocks are cached outside of block cache, and users won't be able to control how much memory should be use to cache these blocks, other than setting max_open_files.". The default settings doesn't set max_open_files and the rocksdb default seems to be 1000 (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89) .. not completely sure about this.
Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x your worst case memory use."

May I presume a unique ValueStateDescriptor equals a Column Family?
If so, say I have 10 of those.
8MB + (10 * 64 * 2) + $Index&FilterBlocks

So is that correct and how would one calculate $Index&FilterBlocks? The docs suggest a relationship between max_open_files (1000) and the amount index/filter of blocks that can be cached, but is this a 1 to 1 relationship? Anyway, this concept of blocks is very unclear.

> Have you ever set the memory limit of your taskmanager pod when launching it in k8s?

Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and 1 slot and were looking into downsizing a bit to improve our pod to VM ratio.

On Wed, Jul 24, 2019 at 11:07 AM Yang Wang <[hidden email]> wrote:

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  • jvm heap, limited by -Xmx
  • jvm non-heap, limited by -XX:MaxMetaspaceSize
  • jvm direct memory, limited by -XX:MaxDirectMemorySize
  • native memory, used by rocksdb, just as Yun Tang said, could be limited by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap memory or native memory. I suggest you add an environment FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. And then only the native memory could cause OOM. Leave enough memory for rocksdb, and then hope your job could run smoothly.


Yun Tang <[hidden email]> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: Memory constrains running Flink on Kubernetes

Yun Tang
You are correct, the default value of write buffer size is 64 MB [1]. However, the java doc for this value is not correct [2]. Already created a PR to fix this.


Best
Yun Tang

From: wvl <[hidden email]>
Sent: Monday, August 5, 2019 17:55
To: Yu Li <[hidden email]>
Cc: Yun Tang <[hidden email]>; Yang Wang <[hidden email]>; Xintong Song <[hidden email]>; user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Btw, with regard to:

> The default writer-buffer-number is 2 at most for each column family, and the default write-buffer-memory size is 4MB.

This isn't what I see when looking at the OPTIONS-XXXXXX file in the rocksdb directories in state:

[CFOptions "xxxxxx"]
  ttl=0
  report_bg_io_stats=false
  compaction_options_universal={allow_trivial_move=false;size_ratio=1;min_merge_width=2;max_size_amplification_percent=200;max_merge_width=4294967295;compression_size_percent=-1;stop_style=kCompactionStopStyleTotalSize;}
  table_factory=BlockBasedTable
  paranoid_file_checks=false
  compression_per_level=
  inplace_update_support=false
  soft_pending_compaction_bytes_limit=68719476736
  max_successive_merges=0
  max_write_buffer_number=2
  level_compaction_dynamic_level_bytes=false
  max_bytes_for_level_base=268435456
  optimize_filters_for_hits=false
  force_consistency_checks=false
  disable_auto_compactions=false
  max_compaction_bytes=1677721600
  hard_pending_compaction_bytes_limit=274877906944
  compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;ttl=0;}
  max_bytes_for_level_multiplier=10.000000
  level0_file_num_compaction_trigger=4
  level0_slowdown_writes_trigger=20
  compaction_pri=kByCompensatedSize
  compaction_filter=nullptr
  level0_stop_writes_trigger=36
  write_buffer_size=67108864
  min_write_buffer_number_to_merge=1
  num_levels=7
  target_file_size_multiplier=1
  arena_block_size=8388608
  memtable_huge_page_size=0
  bloom_locality=0
  inplace_update_num_locks=10000
  memtable_prefix_bloom_size_ratio=0.000000
  max_sequential_skip_in_iterations=8
  max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
  compression=kSnappyCompression
  max_write_buffer_number_to_maintain=0
  bottommost_compression=kDisableCompressionOption
  comparator=leveldb.BytewiseComparator
  prefix_extractor=nullptr
  target_file_size_base=67108864
  merge_operator=StringAppendTESTOperator
  memtable_insert_with_hint_prefix_extractor=nullptr
  memtable_factory=SkipListFactory
  compaction_filter_factory=nullptr
  compaction_style=kCompactionStyleLevel

Are these options somehow not applied or overridden?

On Mon, Jul 29, 2019 at 4:42 PM wvl <[hidden email]> wrote:
Excellent. Thanks for all the answers so far.

So there was another issue I mentioned which we made some progress gaining insight into, namely our metaspace growth when faced with job restarts.

We can easily hit 1Gb metaspace usage within 15 minutes if we restart often.
We attempted to troubleshoot this issue by looking at all the classes in metaspace using `jcmd <pid> GC.class_stats`.

Here we observed that after every job restart another entry is created for every class in our job. Where the old classes have InstBytes=0. So far so good, but moving to the Total column for these entries show that memory is still being used.
Also, adding up all entries in the Total column indeed corresponds to our metaspace usage. So far we could only conclude that our job classes - none of them - were being unloaded.

Then we stumbled upon this ticket. Now here are our results running the SocketWindowWordCount jar in a flink 1.8.0 cluster with one taskmanager.

We achieve a class count by doing a jcmd 3052 GC.class_stats | grep -i org.apache.flink.streaming.examples.windowing.SessionWindowing | wc -l

First run:
  Class Count: 1
  Metaspace: 30695K

After 800~ runs:
  Class Count: 802
  Metaspace: 39406K


Interesting when we looked a bit later the class count slowly went down, slowly, step by step, where just to be sure we used `jcmd <pid> GC.run` to force GC every 30s or so. If I had to guess it took about 20 minutes to go from 800~ to 170~, with metaspace dropping to 35358K. In a sense we've seen this behavior, but with much much larger increases in metaspace usage over far fewer job restarts.

I've added this information to https://issues.apache.org/jira/browse/FLINK-11205.

That said, I'd really like to confirm the following:
- classes should usually only appear once in GC.class_stats output
- flink / the jvm has very slow cleanup of the metaspace
- something clearly is leaking during restarts


On Mon, Jul 29, 2019 at 9:52 AM Yu Li <[hidden email]> wrote:
For the memory usage of RocksDB, there's already some discussion in FLINK-7289 and a good suggestion from Mike to use the WriteBufferManager to limit the total memory usage, FYI.

We will drive to make the memory management of state backends more "hands free" in latter release (probably in release 1.10) and please watch the release plan and/or the weekly community update [1] threads.

[1] https://s.apache.org/ix7iv

Best Regards,
Yu


On Thu, 25 Jul 2019 at 15:12, Yun Tang <[hidden email]> wrote:
Hi

It's definitely not easy to calculate the accurate memory usage of RocksDB, but formula of "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory"  should give enough sophisticated hints.
When talking about the column-family-number, they are equals to the number of your states which are the declared state descriptors in one operator and potential one window state (if you're using window).
The default writer-buffer-number is 2 at most for each column family, and the default write-buffer-memory size is 4MB. Pay attention that if you ever configure the options for RocksDB, these memory usage would differ from default values.
The last part of index&filter memory is not easy to estimate, but from my experience this part of memory would not occupy too much only if you have many open files.

Last but not least, Flink would enable slot sharing by default, and even if you only one slot per taskmanager, there might exists many RocksDB within that TM due to many operator with keyed state running.

Apart from the theoretical analysis, you'd better to open RocksDB native metrics or track the memory usage of pods through Prometheus with k8s. 

Best
Yun Tang

From: wvl <[hidden email]>
Sent: Thursday, July 25, 2019 17:50
To: Yang Wang <[hidden email]>
Cc: Yun Tang <[hidden email]>; Xintong Song <[hidden email]>; user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Thanks for all the answers so far.

Especially clarifying was that RocksDB memory usage isn't accounted for in the flink memory metrics. It's clear that we need to experiment to understand it's memory usage and knowing that we should be looking at the container memory usage minus all the jvm managed memory, helps.

In mean while, we've set MaxMetaspaceSize to 200M based on our metrics. Sadly the resulting OOM does not result a better behaved job, because it would seem that the (taskmanager) JVM itself is not restarted - which makes sense in a multijob environment.
So we're looking into ways to simply prevent this metaspace growth (job library jars in /lib on TM).

Going back to RocksDB, the given formula "block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory." isn't completely clear to me.

Block Cache: "Out of box, RocksDB will use LRU-based block cache implementation with 8MB capacity"
Index & Filter Cache: "By default index and filter blocks are cached outside of block cache, and users won't be able to control how much memory should be use to cache these blocks, other than setting max_open_files.". The default settings doesn't set max_open_files and the rocksdb default seems to be 1000 (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89) .. not completely sure about this.
Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x your worst case memory use."

May I presume a unique ValueStateDescriptor equals a Column Family?
If so, say I have 10 of those.
8MB + (10 * 64 * 2) + $Index&FilterBlocks

So is that correct and how would one calculate $Index&FilterBlocks? The docs suggest a relationship between max_open_files (1000) and the amount index/filter of blocks that can be cached, but is this a 1 to 1 relationship? Anyway, this concept of blocks is very unclear.

> Have you ever set the memory limit of your taskmanager pod when launching it in k8s?

Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and 1 slot and were looking into downsizing a bit to improve our pod to VM ratio.

On Wed, Jul 24, 2019 at 11:07 AM Yang Wang <[hidden email]> wrote:

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  • jvm heap, limited by -Xmx
  • jvm non-heap, limited by -XX:MaxMetaspaceSize
  • jvm direct memory, limited by -XX:MaxDirectMemorySize
  • native memory, used by rocksdb, just as Yun Tang said, could be limited by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap memory or native memory. I suggest you add an environment FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. And then only the native memory could cause OOM. Leave enough memory for rocksdb, and then hope your job could run smoothly.


Yun Tang <[hidden email]> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula could be: block-cache-memory + column-family-number * write-buffer-memory * write-buffer-number + index&filter memory. The block cache, write buffer memory&number could be mainly configured. And the column-family number is decided by the state number within your operator. The last part of index&filter memory cannot be measured well only if you also cache them in block cache [2] (but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of RocksDB [3] is a good choice.



Best
Yun Tang

From: Xintong Song <[hidden email]>
Sent: Wednesday, July 24, 2019 11:59
To: wvl <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory constrains running Flink on Kubernetes
 
Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. According to MXBean document, non-heap is "the Java virtual machine manages memory other than the heap (referred as non-heap memory)". Not sure whether that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct memory could be considered as java memory (or at least allocated through the java process). That means, RocksDB is actually using the memory that is accounted in the total K8s container memory but not accounted in neither of java heap / non-heap / direct memory, which in your case the 1GB unaccounted. To leave more memory for RocksDB, you need to either configure more memory for the K8s containers, or configure less java memory through the config option 'taskmanager.heap.size'. 

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also accounts for network memory (which uses direct buffers). Currently, memory configurations in Flink is quite complicated and confusing. The community is aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be limits on heap, non-heap and direct memory in JVM. You should be able to find which part that requires memory more than the limit from the java OOM error message. If there is no java OOM but a K8s container OOM, then it should be non-java memory used by RocksDB.


Thank you~

Xintong Song



On Tue, Jul 23, 2019 at 8:42 PM wvl <[hidden email]> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re:Memory constrains running Flink on Kubernetes

shengjk1
In reply to this post by wvl
+1

I also encountered a similar problem, but I run flink application that uses state in RocksDB on yarn. Yarn container was killed because OOM.
I also saw rockdb tuning guide[1], tune some parameters,but it is useless , such as:

class MyOptions1 implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions.setDbWriteBufferSize(64 * 1024 * 1024)
.setIncreaseParallelism(2)
.setMaxBackgroundFlushes(2)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setMaxOpenFiles(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(16 * 1024 * 1024)
//increases read amplification but decreases memory useage and space amplification
.setBlockSize(16 * 1024 * 1024))
.setWriteBufferSize(16 * 1024 * 1024)
.setMaxWriteBufferNumber(1);
}
}

Additional, this is FLINK-7289, it is similar to us. But I don’t find a good way to  fix it.





Best,
Shengjk1


On 07/24/2019 03:48[hidden email] wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: Re:Memory constrains running Flink on Kubernetes

Yun Tang
Hi Shengjk1

setBlockCacheSize, setWriteBufferSize and setMaxWriteBufferNumber could help you to control memory usage. However, Flink would store state per column family which would increase the number of column family and each family has its own write buffer. FRocksDB [1] already plan to fix this by introducing RocksDB's feature of write buffer manager. We would try to fix FLINK-7289 before Flink-1.10 release.

If you are really urgent to fix this problem, I have a non-official built frocksDB based on rocksDB-5.18.3 which had been verified work well from Gyula Fora's experience. You could contact me in private to get this jar package and rebuild your Flink runtime to enable write buffer manager future.



Best
Yun Tang


From: shengjk1 <[hidden email]>
Sent: Thursday, October 10, 2019 20:37
To: wvl <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re:Memory constrains running Flink on Kubernetes
 
+1

I also encountered a similar problem, but I run flink application that uses state in RocksDB on yarn. Yarn container was killed because OOM.
I also saw rockdb tuning guide[1], tune some parameters,but it is useless , such as:

class MyOptions1 implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions.setDbWriteBufferSize(64 * 1024 * 1024)
.setIncreaseParallelism(2)
.setMaxBackgroundFlushes(2)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setMaxOpenFiles(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(16 * 1024 * 1024)
//increases read amplification but decreases memory useage and space amplification
.setBlockSize(16 * 1024 * 1024))
.setWriteBufferSize(16 * 1024 * 1024)
.setMaxWriteBufferNumber(1);
}
}

Additional, this is FLINK-7289, it is similar to us. But I don’t find a good way to  fix it.





Best,
Shengjk1


On 07/24/2019 03:48[hidden email] wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded. 

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William