Memory usage increases on every job restart resulting in eventual OOMKill

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Memory usage increases on every job restart resulting in eventual OOMKill

Randal Pitt
Hi,

We're running Flink 1.11.3 on Kubernetes. We have a job with parallelism of
10 running on 10 task managers each with 1 task slot. The job has 4 time
windows with 2 different keys, 2 windows have reducers and 2 are processed
by window functions. State is stored in RocksDB.

We've noticed when a pod is restarted (say if the node it was on is
restarted) the job restarts and the memory usage of the remaining 9 pods
increases by roughly 1GB over the next 1-2 hours then stays at that level.
If another pod restarts the remaining 9 increase in memory usage again.
Eventually one or more pods reach the 6GB limit and are OOMKilled, leading
to the job restarting and memory usage increasing again.

If left it can lead to the situation where an OOMKill directly leads to an
OOMKill which directly leads to another. At this point it requires manual
intervention to resolve.

I think it's exceedingly likely the excessive memory usage is in RocksDB
rather than Flink, my question is whether there's anything we can do about
the increase in memory usage after a failure?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Memory usage increases on every job restart resulting in eventual OOMKill

Xintong Song
Hi Randal,
The image is too blurred to be clearly seen.
I have a few questions.
- IIUC, you are using the standalone K8s deployment [1], not the native K8s deployment [2]. Could you confirm that?
- How is the memory measured?


On Tue, Feb 2, 2021 at 7:24 PM Randal Pitt <[hidden email]> wrote:
Hi,

We're running Flink 1.11.3 on Kubernetes. We have a job with parallelism of
10 running on 10 task managers each with 1 task slot. The job has 4 time
windows with 2 different keys, 2 windows have reducers and 2 are processed
by window functions. State is stored in RocksDB.

We've noticed when a pod is restarted (say if the node it was on is
restarted) the job restarts and the memory usage of the remaining 9 pods
increases by roughly 1GB over the next 1-2 hours then stays at that level.
If another pod restarts the remaining 9 increase in memory usage again.
Eventually one or more pods reach the 6GB limit and are OOMKilled, leading
to the job restarting and memory usage increasing again.

If left it can lead to the situation where an OOMKill directly leads to an
OOMKill which directly leads to another. At this point it requires manual
intervention to resolve.

I think it's exceedingly likely the excessive memory usage is in RocksDB
rather than Flink, my question is whether there's anything we can do about
the increase in memory usage after a failure?

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Memory usage increases on every job restart resulting in eventual OOMKill

Randal Pitt
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>  
so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Memory usage increases on every job restart resulting in eventual OOMKill

Xintong Song
How is the memory measured?
I meant which flink or k8s metric is collected? I'm asking because depending on which metric is used, the *container memory usage* can be defined differently. E.g., whether mmap memory is included.

Also, could you share the effective memory configurations for the taskmanagers? You should find something like the following at the beginning of taskmanger logs.

INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:          1.688gb (1811939328 bytes)
INFO  [] -     Total Flink Memory:          1.250gb (1342177280 bytes)
INFO  [] -       Total JVM Heap Memory:     512.000mb (536870902 bytes)
INFO  [] -         Framework:               128.000mb (134217728 bytes)
INFO  [] -         Task:                    384.000mb (402653174 bytes)
INFO  [] -       Total Off-heap Memory:     768.000mb (805306378 bytes)
INFO  [] -         Managed:                 512.000mb (536870920 bytes)
INFO  [] -         Total JVM Direct Memory: 256.000mb (268435458 bytes)
INFO  [] -           Framework:             128.000mb (134217728 bytes)
INFO  [] -           Task:                  0 bytes
INFO  [] -           Network:               128.000mb (134217730 bytes)
INFO  [] -     JVM Metaspace:               256.000mb (268435456 bytes)
INFO  [] -     JVM Overhead:                192.000mb (201326592 bytes)


Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt <[hidden email]> wrote:
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png
so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Memory usage increases on every job restart resulting in eventual OOMKill

Lasse Nedergaard-2
Hi

We had something similar and our problem was class loader leaks. We used a summary log component to reduce logging but still turned out that it used a static object that wasn’t released when we got an OOM or restart. Flink was reusing task managers so only workaround was to stop the job wait until they was removed and start again until we fixed the underlying problem. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 3. feb. 2021 kl. 02.54 skrev Xintong Song <[hidden email]>:


How is the memory measured?
I meant which flink or k8s metric is collected? I'm asking because depending on which metric is used, the *container memory usage* can be defined differently. E.g., whether mmap memory is included.

Also, could you share the effective memory configurations for the taskmanagers? You should find something like the following at the beginning of taskmanger logs.

INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:          1.688gb (1811939328 bytes)
INFO  [] -     Total Flink Memory:          1.250gb (1342177280 bytes)
INFO  [] -       Total JVM Heap Memory:     512.000mb (536870902 bytes)
INFO  [] -         Framework:               128.000mb (134217728 bytes)
INFO  [] -         Task:                    384.000mb (402653174 bytes)
INFO  [] -       Total Off-heap Memory:     768.000mb (805306378 bytes)
INFO  [] -         Managed:                 512.000mb (536870920 bytes)
INFO  [] -         Total JVM Direct Memory: 256.000mb (268435458 bytes)
INFO  [] -           Framework:             128.000mb (134217728 bytes)
INFO  [] -           Task:                  0 bytes
INFO  [] -           Network:               128.000mb (134217730 bytes)
INFO  [] -     JVM Metaspace:               256.000mb (268435456 bytes)
INFO  [] -     JVM Overhead:                192.000mb (201326592 bytes)


Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt <[hidden email]> wrote:
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png
so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Memory usage increases on every job restart resulting in eventual OOMKill

Yun Tang
Hi Randal,

Please consider to use jemalloc instead of glibc as default memory allocator [1] to avoid memory fragmentation. As far as I know, at least two groups of users, who run Flink on YARN and k8s respectively, have reported similar problem that memory continues growing up once restart [2]. The problem both went away once they adopt to use JeMalloc.


Best
Yun Tang

From: Lasse Nedergaard <[hidden email]>
Sent: Wednesday, February 3, 2021 14:07
To: Xintong Song <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Memory usage increases on every job restart resulting in eventual OOMKill
 
Hi

We had something similar and our problem was class loader leaks. We used a summary log component to reduce logging but still turned out that it used a static object that wasn’t released when we got an OOM or restart. Flink was reusing task managers so only workaround was to stop the job wait until they was removed and start again until we fixed the underlying problem. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 3. feb. 2021 kl. 02.54 skrev Xintong Song <[hidden email]>:


How is the memory measured?
I meant which flink or k8s metric is collected? I'm asking because depending on which metric is used, the *container memory usage* can be defined differently. E.g., whether mmap memory is included.

Also, could you share the effective memory configurations for the taskmanagers? You should find something like the following at the beginning of taskmanger logs.

INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:          1.688gb (1811939328 bytes)
INFO  [] -     Total Flink Memory:          1.250gb (1342177280 bytes)
INFO  [] -       Total JVM Heap Memory:     512.000mb (536870902 bytes)
INFO  [] -         Framework:               128.000mb (134217728 bytes)
INFO  [] -         Task:                    384.000mb (402653174 bytes)
INFO  [] -       Total Off-heap Memory:     768.000mb (805306378 bytes)
INFO  [] -         Managed:                 512.000mb (536870920 bytes)
INFO  [] -         Total JVM Direct Memory: 256.000mb (268435458 bytes)
INFO  [] -           Framework:             128.000mb (134217728 bytes)
INFO  [] -           Task:                  0 bytes
INFO  [] -           Network:               128.000mb (134217730 bytes)
INFO  [] -     JVM Metaspace:               256.000mb (268435456 bytes)
INFO  [] -     JVM Overhead:                192.000mb (201326592 bytes)


Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt <[hidden email]> wrote:
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png
so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Memory usage increases on every job restart resulting in eventual OOMKill

Randal Pitt
Thanks everyone for the responses.

I tried out the JeMalloc suggestion from FLINK-19125 using a patched 1.11.3
image and so far it appears to working well. I see it's included in 1.12.1
and Docker images are available so I'll look at upgrading too.

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/