Possible memory leak in JobManager (Flink 1.10.0)?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Possible memory leak in JobManager (Flink 1.10.0)?

Marc LEGER
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc

leak_suspect.png (43K) Download Attachment
dominator_tree.png (83K) Download Attachment
shortest_paths_to_acumulation_point.png (112K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Possible memory leak in JobManager (Flink 1.10.0)?

Yun Tang
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which are stored in the workQueue of io-executor [1] in ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that Executors#newFixedThreadPool would create a ThreadPoolExecutor with a LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:
  1. How large of your checkpoint meta, you could view {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state backend you use to help know this.
  2. What is the interval of your checkpoints, a smaller checkpoint interval might accumulate many completed checkpoints to subsume once a newer checkpoint completes.


Best
Yun Tang


From: Marc LEGER <[hidden email]>
Sent: Wednesday, April 8, 2020 16:50
To: [hidden email] <[hidden email]>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?
 
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc
Reply | Threaded
Open this post in threaded view
|

Re: Possible memory leak in JobManager (Flink 1.10.0)?

Marc LEGER
Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS S3 as a distributed file system, everything is configured in flink-conf.yaml as follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the jobs. Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang <[hidden email]> a écrit :
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which are stored in the workQueue of io-executor [1] in ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that Executors#newFixedThreadPool would create a ThreadPoolExecutor with a LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:
  1. How large of your checkpoint meta, you could view {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state backend you use to help know this.
  2. What is the interval of your checkpoints, a smaller checkpoint interval might accumulate many completed checkpoints to subsume once a newer checkpoint completes.


Best
Yun Tang


From: Marc LEGER <[hidden email]>
Sent: Wednesday, April 8, 2020 16:50
To: [hidden email] <[hidden email]>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?
 
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc
Reply | Threaded
Open this post in threaded view
|

Re: Possible memory leak in JobManager (Flink 1.10.0)?

Till Rohrmann
Thanks for reporting this issue Marc. From what you've reported, I think Yun is right and that the large memory footprint is caused by CompletedCheckpoints which cannot be removed fast enough. One way to verify this is to enable TRACE logging because then Flink will log for every CompletedCheckpoint when it gets discarded. The line should look like this "Executing discard procedure for Checkpoint". The high number of chk-X folders on S3 could be the result of the slow discard operations.

If you want then we can also take a look at the logs and ideally also the heap dump if you can share them with us.

I think one difference between Flink 1.10.0 and 1.7.2 is that we are using a fixed thread pool for running the io operations in 1.10.0. The number of threads equals the number of cores. In contrast, in Flink 1.7.2 we used a fork join pool with a max parallelism of 64. This difference could explain the lower throughput of discard operations because fewer can happen in parallel.

Cheers,
Till

On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <[hidden email]> wrote:
Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS S3 as a distributed file system, everything is configured in flink-conf.yaml as follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the jobs. Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang <[hidden email]> a écrit :
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which are stored in the workQueue of io-executor [1] in ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that Executors#newFixedThreadPool would create a ThreadPoolExecutor with a LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:
  1. How large of your checkpoint meta, you could view {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state backend you use to help know this.
  2. What is the interval of your checkpoints, a smaller checkpoint interval might accumulate many completed checkpoints to subsume once a newer checkpoint completes.


Best
Yun Tang


From: Marc LEGER <[hidden email]>
Sent: Wednesday, April 8, 2020 16:50
To: [hidden email] <[hidden email]>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?
 
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc
Reply | Threaded
Open this post in threaded view
|

Re: Possible memory leak in JobManager (Flink 1.10.0)?

Yun Tang
Hi Marc

The left 'chk-X' folders, which should be discarded when removing checkpoint at the final stage, could also prove that those not discarded completed checkpoint meta occupied the memory.

If we treat your average checkpoint meta size as 30KB, 20000 not-discarded complete checkpoints would occupy about 585MB memory, which is close to your observed scenario.

From my point of view, the checkpoint interval of one second is really too often and would not make much sense in production environment.

Best
Yun Tang

From: Till Rohrmann <[hidden email]>
Sent: Thursday, April 9, 2020 17:41
To: Marc LEGER <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Possible memory leak in JobManager (Flink 1.10.0)?
 
Thanks for reporting this issue Marc. From what you've reported, I think Yun is right and that the large memory footprint is caused by CompletedCheckpoints which cannot be removed fast enough. One way to verify this is to enable TRACE logging because then Flink will log for every CompletedCheckpoint when it gets discarded. The line should look like this "Executing discard procedure for Checkpoint". The high number of chk-X folders on S3 could be the result of the slow discard operations.

If you want then we can also take a look at the logs and ideally also the heap dump if you can share them with us.

I think one difference between Flink 1.10.0 and 1.7.2 is that we are using a fixed thread pool for running the io operations in 1.10.0. The number of threads equals the number of cores. In contrast, in Flink 1.7.2 we used a fork join pool with a max parallelism of 64. This difference could explain the lower throughput of discard operations because fewer can happen in parallel.

Cheers,
Till

On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <[hidden email]> wrote:
Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS S3 as a distributed file system, everything is configured in flink-conf.yaml as follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the jobs. Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang <[hidden email]> a écrit :
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which are stored in the workQueue of io-executor [1] in ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that Executors#newFixedThreadPool would create a ThreadPoolExecutor with a LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:
  1. How large of your checkpoint meta, you could view {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state backend you use to help know this.
  2. What is the interval of your checkpoints, a smaller checkpoint interval might accumulate many completed checkpoints to subsume once a newer checkpoint completes.


Best
Yun Tang


From: Marc LEGER <[hidden email]>
Sent: Wednesday, April 8, 2020 16:50
To: [hidden email] <[hidden email]>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?
 
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc
Reply | Threaded
Open this post in threaded view
|

Re: Possible memory leak in JobManager (Flink 1.10.0)?

Till Rohrmann
For further reference, I've created this issue [1] to track the problem.


Cheers,
Till

On Thu, Apr 9, 2020 at 1:20 PM Yun Tang <[hidden email]> wrote:
Hi Marc

The left 'chk-X' folders, which should be discarded when removing checkpoint at the final stage, could also prove that those not discarded completed checkpoint meta occupied the memory.

If we treat your average checkpoint meta size as 30KB, 20000 not-discarded complete checkpoints would occupy about 585MB memory, which is close to your observed scenario.

From my point of view, the checkpoint interval of one second is really too often and would not make much sense in production environment.

Best
Yun Tang

From: Till Rohrmann <[hidden email]>
Sent: Thursday, April 9, 2020 17:41
To: Marc LEGER <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Possible memory leak in JobManager (Flink 1.10.0)?
 
Thanks for reporting this issue Marc. From what you've reported, I think Yun is right and that the large memory footprint is caused by CompletedCheckpoints which cannot be removed fast enough. One way to verify this is to enable TRACE logging because then Flink will log for every CompletedCheckpoint when it gets discarded. The line should look like this "Executing discard procedure for Checkpoint". The high number of chk-X folders on S3 could be the result of the slow discard operations.

If you want then we can also take a look at the logs and ideally also the heap dump if you can share them with us.

I think one difference between Flink 1.10.0 and 1.7.2 is that we are using a fixed thread pool for running the io operations in 1.10.0. The number of threads equals the number of cores. In contrast, in Flink 1.7.2 we used a fork join pool with a max parallelism of 64. This difference could explain the lower throughput of discard operations because fewer can happen in parallel.

Cheers,
Till

On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <[hidden email]> wrote:
Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS S3 as a distributed file system, everything is configured in flink-conf.yaml as follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the jobs. Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang <[hidden email]> a écrit :
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which are stored in the workQueue of io-executor [1] in ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that Executors#newFixedThreadPool would create a ThreadPoolExecutor with a LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:
  1. How large of your checkpoint meta, you could view {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state backend you use to help know this.
  2. What is the interval of your checkpoints, a smaller checkpoint interval might accumulate many completed checkpoints to subsume once a newer checkpoint completes.


Best
Yun Tang


From: Marc LEGER <[hidden email]>
Sent: Wednesday, April 8, 2020 16:50
To: [hidden email] <[hidden email]>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?
 
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc
Reply | Threaded
Open this post in threaded view
|

Re: Possible memory leak in JobManager (Flink 1.10.0)?

Till Rohrmann
What you could also try out is whether the same problem occurs with Flink 1.7.3. We did the executor change in this bug fix release. This could help us validating my suspicion.

Cheers,
Till

On Thu, Apr 9, 2020 at 4:24 PM Till Rohrmann <[hidden email]> wrote:
For further reference, I've created this issue [1] to track the problem.


Cheers,
Till

On Thu, Apr 9, 2020 at 1:20 PM Yun Tang <[hidden email]> wrote:
Hi Marc

The left 'chk-X' folders, which should be discarded when removing checkpoint at the final stage, could also prove that those not discarded completed checkpoint meta occupied the memory.

If we treat your average checkpoint meta size as 30KB, 20000 not-discarded complete checkpoints would occupy about 585MB memory, which is close to your observed scenario.

From my point of view, the checkpoint interval of one second is really too often and would not make much sense in production environment.

Best
Yun Tang

From: Till Rohrmann <[hidden email]>
Sent: Thursday, April 9, 2020 17:41
To: Marc LEGER <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Possible memory leak in JobManager (Flink 1.10.0)?
 
Thanks for reporting this issue Marc. From what you've reported, I think Yun is right and that the large memory footprint is caused by CompletedCheckpoints which cannot be removed fast enough. One way to verify this is to enable TRACE logging because then Flink will log for every CompletedCheckpoint when it gets discarded. The line should look like this "Executing discard procedure for Checkpoint". The high number of chk-X folders on S3 could be the result of the slow discard operations.

If you want then we can also take a look at the logs and ideally also the heap dump if you can share them with us.

I think one difference between Flink 1.10.0 and 1.7.2 is that we are using a fixed thread pool for running the io operations in 1.10.0. The number of threads equals the number of cores. In contrast, in Flink 1.7.2 we used a fork join pool with a max parallelism of 64. This difference could explain the lower throughput of discard operations because fewer can happen in parallel.

Cheers,
Till

On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <[hidden email]> wrote:
Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS S3 as a distributed file system, everything is configured in flink-conf.yaml as follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the jobs. Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang <[hidden email]> a écrit :
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which are stored in the workQueue of io-executor [1] in ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that Executors#newFixedThreadPool would create a ThreadPoolExecutor with a LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:
  1. How large of your checkpoint meta, you could view {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state backend you use to help know this.
  2. What is the interval of your checkpoints, a smaller checkpoint interval might accumulate many completed checkpoints to subsume once a newer checkpoint completes.


Best
Yun Tang


From: Marc LEGER <[hidden email]>
Sent: Wednesday, April 8, 2020 16:50
To: [hidden email] <[hidden email]>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?
 
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc
Reply | Threaded
Open this post in threaded view
|

Re: Possible memory leak in JobManager (Flink 1.10.0)?

Marc LEGER
Hello,

Actually, I agree I do not need to have such an aggressive checkpoint period for my jobs, so I increased the checkpoint period from 1 to 10s and JobManager memory consumption is now quite stable for 3 days in my Flink 1.10.0 cluster.

Thanks a lot for your help.

Best regards,
Marc

Le ven. 10 avr. 2020 à 11:54, Till Rohrmann <[hidden email]> a écrit :
What you could also try out is whether the same problem occurs with Flink 1.7.3. We did the executor change in this bug fix release. This could help us validating my suspicion.

Cheers,
Till

On Thu, Apr 9, 2020 at 4:24 PM Till Rohrmann <[hidden email]> wrote:
For further reference, I've created this issue [1] to track the problem.


Cheers,
Till

On Thu, Apr 9, 2020 at 1:20 PM Yun Tang <[hidden email]> wrote:
Hi Marc

The left 'chk-X' folders, which should be discarded when removing checkpoint at the final stage, could also prove that those not discarded completed checkpoint meta occupied the memory.

If we treat your average checkpoint meta size as 30KB, 20000 not-discarded complete checkpoints would occupy about 585MB memory, which is close to your observed scenario.

From my point of view, the checkpoint interval of one second is really too often and would not make much sense in production environment.

Best
Yun Tang

From: Till Rohrmann <[hidden email]>
Sent: Thursday, April 9, 2020 17:41
To: Marc LEGER <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Possible memory leak in JobManager (Flink 1.10.0)?
 
Thanks for reporting this issue Marc. From what you've reported, I think Yun is right and that the large memory footprint is caused by CompletedCheckpoints which cannot be removed fast enough. One way to verify this is to enable TRACE logging because then Flink will log for every CompletedCheckpoint when it gets discarded. The line should look like this "Executing discard procedure for Checkpoint". The high number of chk-X folders on S3 could be the result of the slow discard operations.

If you want then we can also take a look at the logs and ideally also the heap dump if you can share them with us.

I think one difference between Flink 1.10.0 and 1.7.2 is that we are using a fixed thread pool for running the io operations in 1.10.0. The number of threads equals the number of cores. In contrast, in Flink 1.7.2 we used a fork join pool with a max parallelism of 64. This difference could explain the lower throughput of discard operations because fewer can happen in parallel.

Cheers,
Till

On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <[hidden email]> wrote:
Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS S3 as a distributed file system, everything is configured in flink-conf.yaml as follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the jobs. Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang <[hidden email]> a écrit :
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which are stored in the workQueue of io-executor [1] in ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that Executors#newFixedThreadPool would create a ThreadPoolExecutor with a LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:
  1. How large of your checkpoint meta, you could view {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state backend you use to help know this.
  2. What is the interval of your checkpoints, a smaller checkpoint interval might accumulate many completed checkpoints to subsume once a newer checkpoint completes.


Best
Yun Tang


From: Marc LEGER <[hidden email]>
Sent: Wednesday, April 8, 2020 16:50
To: [hidden email] <[hidden email]>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?
 
Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with JobManagers deployed in a standalone cluster configured in HA mode with 3 TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process is eventually consuming all available memory and is hanging after a few hours or days (depending on the size of the heap) before being deassociated from the cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit Compressed

I performed a heap dump for analysis on the JobManager Java process and generated a "Leak Suspects" report using Eclipse MAT.
The tool is detecting one main suspect (cf. attached screenshots):

One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance is referenced by org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ 0x8041fb48 , loaded by "<system class loader>".

Has anyone already faced such an issue ?

Best Regards,
Marc