Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Eleanore Jin
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Till Rohrmann
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high that it takes a bit of time until GC is triggered. Have you tried whether the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM needs to do a bit more bookkeeping in order to track the completed checkpoints. If you are using the HeapStateBackend, then all states smaller than state.backend.fs.memory-threshold will get inlined, meaning that they are sent to the JM and stored in the checkpoint meta file. This can increase the memory usage of the JM process. Depending on state.checkpoints.num-retained this can grow as large as number retained checkpoints times the checkpoint size. However, I doubt that this adds up to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could be helpful. Also a heap dump might be able to point us towards the component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <[hidden email]> wrote:
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Eleanore Jin
Hi Till,
Thanks a lot for the prompt response, please see below information. 

1. how much memory assign to JM pod? 
6g for container memory limit, 5g for jobmanager.heap.size, I think this is the only available jm memory configuration for flink 1.10.2

2. Have you tried with newer Flink versions? 
I am actually using Apache Beam, so the latest version they support for Flink is 1.10

3. What statebackend is used? 
FsStateBackend, and the checkpoint size is around 12MB from checkpoint metrics, so I think it is not get inlined

4. What is state.checkpoints.num-retained?
I did not configure this explicitly, so by default only 1 should be retained

5. Anything suspicious from JM log?
There is no Exception nor Error, the only thing I see is the below logs keeps on repeating 

{"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling threads for Delete operation as thread count 0 is <= 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":30000}


6. JVM args obtained vis jcmd

-Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow


7. Heap info returned by jcmd <pid> GC.heap_info

it suggested only about 1G of the heap is used

garbage-first heap   total 5242880K, used 1123073K [0x00000006c0000000, 0x0000000800000000)

  region size 2048K, 117 young (239616K), 15 survivors (30720K)

 Metaspace       used 108072K, capacity 110544K, committed 110720K, reserved 1146880K

  class space    used 12963K, capacity 13875K, committed 13952K, reserved 1048576K


8. top -p <pid>

it suggested for flink job manager java process 4.8G of physical memory is consumed

PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                 

    1 root      20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java      



Thanks a lot!
Eleanore


On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high that it takes a bit of time until GC is triggered. Have you tried whether the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM needs to do a bit more bookkeeping in order to track the completed checkpoints. If you are using the HeapStateBackend, then all states smaller than state.backend.fs.memory-threshold will get inlined, meaning that they are sent to the JM and stored in the checkpoint meta file. This can increase the memory usage of the JM process. Depending on state.checkpoints.num-retained this can grow as large as number retained checkpoints times the checkpoint size. However, I doubt that this adds up to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could be helpful. Also a heap dump might be able to point us towards the component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <[hidden email]> wrote:
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Eleanore Jin
Hi Till, 

please see the screenshot of heap dump: https://ibb.co/92Hzrpr

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Till,
Thanks a lot for the prompt response, please see below information. 

1. how much memory assign to JM pod? 
6g for container memory limit, 5g for jobmanager.heap.size, I think this is the only available jm memory configuration for flink 1.10.2

2. Have you tried with newer Flink versions? 
I am actually using Apache Beam, so the latest version they support for Flink is 1.10

3. What statebackend is used? 
FsStateBackend, and the checkpoint size is around 12MB from checkpoint metrics, so I think it is not get inlined

4. What is state.checkpoints.num-retained?
I did not configure this explicitly, so by default only 1 should be retained

5. Anything suspicious from JM log?
There is no Exception nor Error, the only thing I see is the below logs keeps on repeating 

{"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling threads for Delete operation as thread count 0 is <= 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":30000}


6. JVM args obtained vis jcmd

-Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow


7. Heap info returned by jcmd <pid> GC.heap_info

it suggested only about 1G of the heap is used

garbage-first heap   total 5242880K, used 1123073K [0x00000006c0000000, 0x0000000800000000)

  region size 2048K, 117 young (239616K), 15 survivors (30720K)

 Metaspace       used 108072K, capacity 110544K, committed 110720K, reserved 1146880K

  class space    used 12963K, capacity 13875K, committed 13952K, reserved 1048576K


8. top -p <pid>

it suggested for flink job manager java process 4.8G of physical memory is consumed

PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                 

    1 root      20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java      



Thanks a lot!
Eleanore


On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high that it takes a bit of time until GC is triggered. Have you tried whether the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM needs to do a bit more bookkeeping in order to track the completed checkpoints. If you are using the HeapStateBackend, then all states smaller than state.backend.fs.memory-threshold will get inlined, meaning that they are sent to the JM and stored in the checkpoint meta file. This can increase the memory usage of the JM process. Depending on state.checkpoints.num-retained this can grow as large as number retained checkpoints times the checkpoint size. However, I doubt that this adds up to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could be helpful. Also a heap dump might be able to point us towards the component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <[hidden email]> wrote:
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Eleanore Jin
I also tried enable native memory tracking, via jcmd, here is the memory breakdown: https://ibb.co/ssrZB4F

since job manager memory configuration for flink 1.10.2 only has jobmanager.heap.size, and it only translates to heap settings, should I also set -XX:MaxDirectMemorySize and -XX:MaxMetaspaceSize for job manager? And any recommendations?

Thanks a lot!
Eleanore

On Fri, Oct 23, 2020 at 9:28 AM Eleanore Jin <[hidden email]> wrote:
Hi Till, 

please see the screenshot of heap dump: https://ibb.co/92Hzrpr

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Till,
Thanks a lot for the prompt response, please see below information. 

1. how much memory assign to JM pod? 
6g for container memory limit, 5g for jobmanager.heap.size, I think this is the only available jm memory configuration for flink 1.10.2

2. Have you tried with newer Flink versions? 
I am actually using Apache Beam, so the latest version they support for Flink is 1.10

3. What statebackend is used? 
FsStateBackend, and the checkpoint size is around 12MB from checkpoint metrics, so I think it is not get inlined

4. What is state.checkpoints.num-retained?
I did not configure this explicitly, so by default only 1 should be retained

5. Anything suspicious from JM log?
There is no Exception nor Error, the only thing I see is the below logs keeps on repeating 

{"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling threads for Delete operation as thread count 0 is <= 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":30000}


6. JVM args obtained vis jcmd

-Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow


7. Heap info returned by jcmd <pid> GC.heap_info

it suggested only about 1G of the heap is used

garbage-first heap   total 5242880K, used 1123073K [0x00000006c0000000, 0x0000000800000000)

  region size 2048K, 117 young (239616K), 15 survivors (30720K)

 Metaspace       used 108072K, capacity 110544K, committed 110720K, reserved 1146880K

  class space    used 12963K, capacity 13875K, committed 13952K, reserved 1048576K


8. top -p <pid>

it suggested for flink job manager java process 4.8G of physical memory is consumed

PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                 

    1 root      20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java      



Thanks a lot!
Eleanore


On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high that it takes a bit of time until GC is triggered. Have you tried whether the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM needs to do a bit more bookkeeping in order to track the completed checkpoints. If you are using the HeapStateBackend, then all states smaller than state.backend.fs.memory-threshold will get inlined, meaning that they are sent to the JM and stored in the checkpoint meta file. This can increase the memory usage of the JM process. Depending on state.checkpoints.num-retained this can grow as large as number retained checkpoints times the checkpoint size. However, I doubt that this adds up to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could be helpful. Also a heap dump might be able to point us towards the component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <[hidden email]> wrote:
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Till Rohrmann
Hi Eleanore,

sorry for my late reply. The heap dump you have sent does not look problematic. How do you measure the pod memory usage exactly? If you start the Flink process with -Xms5120m -Xmx5120m then Flink should allocate 5120 MB of heap memory. Hence, this should be exactly what you are seeing in your memory usage graph. This should actually happen independent of the checkpointing.

Maybe you can also share the debug logs with us. Maybe they contain some more information.

Cheers,
Till

On Sat, Oct 24, 2020 at 12:24 AM Eleanore Jin <[hidden email]> wrote:
I also tried enable native memory tracking, via jcmd, here is the memory breakdown: https://ibb.co/ssrZB4F

since job manager memory configuration for flink 1.10.2 only has jobmanager.heap.size, and it only translates to heap settings, should I also set -XX:MaxDirectMemorySize and -XX:MaxMetaspaceSize for job manager? And any recommendations?

Thanks a lot!
Eleanore

On Fri, Oct 23, 2020 at 9:28 AM Eleanore Jin <[hidden email]> wrote:
Hi Till, 

please see the screenshot of heap dump: https://ibb.co/92Hzrpr

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Till,
Thanks a lot for the prompt response, please see below information. 

1. how much memory assign to JM pod? 
6g for container memory limit, 5g for jobmanager.heap.size, I think this is the only available jm memory configuration for flink 1.10.2

2. Have you tried with newer Flink versions? 
I am actually using Apache Beam, so the latest version they support for Flink is 1.10

3. What statebackend is used? 
FsStateBackend, and the checkpoint size is around 12MB from checkpoint metrics, so I think it is not get inlined

4. What is state.checkpoints.num-retained?
I did not configure this explicitly, so by default only 1 should be retained

5. Anything suspicious from JM log?
There is no Exception nor Error, the only thing I see is the below logs keeps on repeating 

{"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling threads for Delete operation as thread count 0 is <= 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":30000}


6. JVM args obtained vis jcmd

-Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow


7. Heap info returned by jcmd <pid> GC.heap_info

it suggested only about 1G of the heap is used

garbage-first heap   total 5242880K, used 1123073K [0x00000006c0000000, 0x0000000800000000)

  region size 2048K, 117 young (239616K), 15 survivors (30720K)

 Metaspace       used 108072K, capacity 110544K, committed 110720K, reserved 1146880K

  class space    used 12963K, capacity 13875K, committed 13952K, reserved 1048576K


8. top -p <pid>

it suggested for flink job manager java process 4.8G of physical memory is consumed

PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                 

    1 root      20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java      



Thanks a lot!
Eleanore


On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high that it takes a bit of time until GC is triggered. Have you tried whether the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM needs to do a bit more bookkeeping in order to track the completed checkpoints. If you are using the HeapStateBackend, then all states smaller than state.backend.fs.memory-threshold will get inlined, meaning that they are sent to the JM and stored in the checkpoint meta file. This can increase the memory usage of the JM process. Depending on state.checkpoints.num-retained this can grow as large as number retained checkpoints times the checkpoint size. However, I doubt that this adds up to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could be helpful. Also a heap dump might be able to point us towards the component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <[hidden email]> wrote:
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Eleanore Jin
Hi Till, 

Thanks for the response! The metrics I got from cadvisor and visualized via dashboard shipped by kubernetes. I actually run the flink job for the past 2 weeks and the memory usage has been stabilized. There is no issue so far. I still could not figure out the mystery why it was trending up initially. 

Thanks a lot for the help!
Eleanoree

On Fri, Nov 13, 2020 at 7:01 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

sorry for my late reply. The heap dump you have sent does not look problematic. How do you measure the pod memory usage exactly? If you start the Flink process with -Xms5120m -Xmx5120m then Flink should allocate 5120 MB of heap memory. Hence, this should be exactly what you are seeing in your memory usage graph. This should actually happen independent of the checkpointing.

Maybe you can also share the debug logs with us. Maybe they contain some more information.

Cheers,
Till

On Sat, Oct 24, 2020 at 12:24 AM Eleanore Jin <[hidden email]> wrote:
I also tried enable native memory tracking, via jcmd, here is the memory breakdown: https://ibb.co/ssrZB4F

since job manager memory configuration for flink 1.10.2 only has jobmanager.heap.size, and it only translates to heap settings, should I also set -XX:MaxDirectMemorySize and -XX:MaxMetaspaceSize for job manager? And any recommendations?

Thanks a lot!
Eleanore

On Fri, Oct 23, 2020 at 9:28 AM Eleanore Jin <[hidden email]> wrote:
Hi Till, 

please see the screenshot of heap dump: https://ibb.co/92Hzrpr

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Till,
Thanks a lot for the prompt response, please see below information. 

1. how much memory assign to JM pod? 
6g for container memory limit, 5g for jobmanager.heap.size, I think this is the only available jm memory configuration for flink 1.10.2

2. Have you tried with newer Flink versions? 
I am actually using Apache Beam, so the latest version they support for Flink is 1.10

3. What statebackend is used? 
FsStateBackend, and the checkpoint size is around 12MB from checkpoint metrics, so I think it is not get inlined

4. What is state.checkpoints.num-retained?
I did not configure this explicitly, so by default only 1 should be retained

5. Anything suspicious from JM log?
There is no Exception nor Error, the only thing I see is the below logs keeps on repeating 

{"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling threads for Delete operation as thread count 0 is <= 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":30000}


6. JVM args obtained vis jcmd

-Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow


7. Heap info returned by jcmd <pid> GC.heap_info

it suggested only about 1G of the heap is used

garbage-first heap   total 5242880K, used 1123073K [0x00000006c0000000, 0x0000000800000000)

  region size 2048K, 117 young (239616K), 15 survivors (30720K)

 Metaspace       used 108072K, capacity 110544K, committed 110720K, reserved 1146880K

  class space    used 12963K, capacity 13875K, committed 13952K, reserved 1048576K


8. top -p <pid>

it suggested for flink job manager java process 4.8G of physical memory is consumed

PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                 

    1 root      20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java      



Thanks a lot!
Eleanore


On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high that it takes a bit of time until GC is triggered. Have you tried whether the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM needs to do a bit more bookkeeping in order to track the completed checkpoints. If you are using the HeapStateBackend, then all states smaller than state.backend.fs.memory-threshold will get inlined, meaning that they are sent to the JM and stored in the checkpoint meta file. This can increase the memory usage of the JM process. Depending on state.checkpoints.num-retained this can grow as large as number retained checkpoints times the checkpoint size. However, I doubt that this adds up to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could be helpful. Also a heap dump might be able to point us towards the component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <[hidden email]> wrote:
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

Till Rohrmann
Glad to hear that!

Cheers,
Till

On Tue, Nov 17, 2020 at 5:35 AM Eleanore Jin <[hidden email]> wrote:
Hi Till, 

Thanks for the response! The metrics I got from cadvisor and visualized via dashboard shipped by kubernetes. I actually run the flink job for the past 2 weeks and the memory usage has been stabilized. There is no issue so far. I still could not figure out the mystery why it was trending up initially. 

Thanks a lot for the help!
Eleanoree

On Fri, Nov 13, 2020 at 7:01 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

sorry for my late reply. The heap dump you have sent does not look problematic. How do you measure the pod memory usage exactly? If you start the Flink process with -Xms5120m -Xmx5120m then Flink should allocate 5120 MB of heap memory. Hence, this should be exactly what you are seeing in your memory usage graph. This should actually happen independent of the checkpointing.

Maybe you can also share the debug logs with us. Maybe they contain some more information.

Cheers,
Till

On Sat, Oct 24, 2020 at 12:24 AM Eleanore Jin <[hidden email]> wrote:
I also tried enable native memory tracking, via jcmd, here is the memory breakdown: https://ibb.co/ssrZB4F

since job manager memory configuration for flink 1.10.2 only has jobmanager.heap.size, and it only translates to heap settings, should I also set -XX:MaxDirectMemorySize and -XX:MaxMetaspaceSize for job manager? And any recommendations?

Thanks a lot!
Eleanore

On Fri, Oct 23, 2020 at 9:28 AM Eleanore Jin <[hidden email]> wrote:
Hi Till, 

please see the screenshot of heap dump: https://ibb.co/92Hzrpr

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Till,
Thanks a lot for the prompt response, please see below information. 

1. how much memory assign to JM pod? 
6g for container memory limit, 5g for jobmanager.heap.size, I think this is the only available jm memory configuration for flink 1.10.2

2. Have you tried with newer Flink versions? 
I am actually using Apache Beam, so the latest version they support for Flink is 1.10

3. What statebackend is used? 
FsStateBackend, and the checkpoint size is around 12MB from checkpoint metrics, so I think it is not get inlined

4. What is state.checkpoints.num-retained?
I did not configure this explicitly, so by default only 1 should be retained

5. Anything suspicious from JM log?
There is no Exception nor Error, the only thing I see is the below logs keeps on repeating 

{"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling threads for Delete operation as thread count 0 is <= 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":30000}


6. JVM args obtained vis jcmd

-Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow


7. Heap info returned by jcmd <pid> GC.heap_info

it suggested only about 1G of the heap is used

garbage-first heap   total 5242880K, used 1123073K [0x00000006c0000000, 0x0000000800000000)

  region size 2048K, 117 young (239616K), 15 survivors (30720K)

 Metaspace       used 108072K, capacity 110544K, committed 110720K, reserved 1146880K

  class space    used 12963K, capacity 13875K, committed 13952K, reserved 1048576K


8. top -p <pid>

it suggested for flink job manager java process 4.8G of physical memory is consumed

PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                 

    1 root      20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java      



Thanks a lot!
Eleanore


On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann <[hidden email]> wrote:
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high that it takes a bit of time until GC is triggered. Have you tried whether the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM needs to do a bit more bookkeeping in order to track the completed checkpoints. If you are using the HeapStateBackend, then all states smaller than state.backend.fs.memory-threshold will get inlined, meaning that they are sent to the JM and stored in the checkpoint meta file. This can increase the memory usage of the JM process. Depending on state.checkpoints.num-retained this can grow as large as number retained checkpoints times the checkpoint size. However, I doubt that this adds up to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could be helpful. Also a heap dump might be able to point us towards the component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <[hidden email]> wrote:
Hi all, 

I have a flink job running version 1.10.2, it simply read from a kafka topic with 96 partitions, and output to another kafka topic. 

It is running in k8s, with 1 JM (not in HA mode), 12 task managers each has 4 slots. 
The checkpoint persists the snapshot to azure blob storage, checkpoints interval every 3 seconds, with 10 seconds timeout and minimum pause of 1 second.

I observed that the job manager pod memory usage grows over time, any hints on why this is the case? And the memory usage for JM is significantly more compared to no checkpoint enabled.
image.png

Thanks a lot!
Eleanore