Discard checkpoint files through a single recursive call

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Discard checkpoint files through a single recursive call

Jiahui Jiang
Hello Flink!

We are building an infrastructure where we implement our own CompletedCheckpointStore. The read and write to the external storage location of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job writes out a very high number of checkpoint files per checkpoint. (In our case we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in CompletedCheckpoint discards the state files one by one, we are seeing a very high number of remote calls. Sometimes the deletion fails to catch up with the checkpoint progress.

Given the interface we are given to configure the external storage location for checkpoints is always a `target directory`. Would it be reasonable to expose an implementation of discard() that directly calls disposeStorageLocation with recursive set to true, without iterating over each individual files first? Is there any blockers for that?

Thank you!


links 
Reply | Threaded
Open this post in threaded view
|

Re: Discard checkpoint files through a single recursive call

Guowei Ma
hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how you want to change it? For example, which interface or class do you want to add a method to?
Although I am not a state expert, as far as I know, due to incremental checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang <[hidden email]> wrote:
Hello Flink!

We are building an infrastructure where we implement our own CompletedCheckpointStore. The read and write to the external storage location of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job writes out a very high number of checkpoint files per checkpoint. (In our case we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in CompletedCheckpoint discards the state files one by one, we are seeing a very high number of remote calls. Sometimes the deletion fails to catch up with the checkpoint progress.

Given the interface we are given to configure the external storage location for checkpoints is always a `target directory`. Would it be reasonable to expose an implementation of discard() that directly calls disposeStorageLocation with recursive set to true, without iterating over each individual files first? Is there any blockers for that?

Thank you!


links 
Reply | Threaded
Open this post in threaded view
|

Re: Discard checkpoint files through a single recursive call

Yun Tang
Hi Jiang,

Please take a look at FLINK-17860 and FLINK-13856 for previous discussion of this problem.


Best
Yun Tang


From: Guowei Ma <[hidden email]>
Sent: Wednesday, June 16, 2021 8:40
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Discard checkpoint files through a single recursive call
 
hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how you want to change it? For example, which interface or class do you want to add a method to?
Although I am not a state expert, as far as I know, due to incremental checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang <[hidden email]> wrote:
Hello Flink!

We are building an infrastructure where we implement our own CompletedCheckpointStore. The read and write to the external storage location of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job writes out a very high number of checkpoint files per checkpoint. (In our case we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in CompletedCheckpoint discards the state files one by one, we are seeing a very high number of remote calls. Sometimes the deletion fails to catch up with the checkpoint progress.

Given the interface we are given to configure the external storage location for checkpoints is always a `target directory`. Would it be reasonable to expose an implementation of discard() that directly calls disposeStorageLocation with recursive set to true, without iterating over each individual files first? Is there any blockers for that?

Thank you!


links 
Reply | Threaded
Open this post in threaded view
|

Re: Discard checkpoint files through a single recursive call

Jiahui Jiang
Hello Yun and Guowei,

Thanks for the context! Looks like the plan is to have a Flink config flag to enable recursive deletion? Is there any plan to push through this PR in the next release? https://github.com/apache/flink/pull/9602


Thank you so much!
Jiahui

From: Yun Tang <[hidden email]>
Sent: Tuesday, June 15, 2021 10:27 PM
To: Guowei Ma <[hidden email]>; Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Discard checkpoint files through a single recursive call
 
Hi Jiang,

Please take a look at FLINK-17860 and FLINK-13856 for previous discussion of this problem.


Best
Yun Tang


From: Guowei Ma <[hidden email]>
Sent: Wednesday, June 16, 2021 8:40
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Discard checkpoint files through a single recursive call
 
hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how you want to change it? For example, which interface or class do you want to add a method to?
Although I am not a state expert, as far as I know, due to incremental checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang <[hidden email]> wrote:
Hello Flink!

We are building an infrastructure where we implement our own CompletedCheckpointStore. The read and write to the external storage location of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job writes out a very high number of checkpoint files per checkpoint. (In our case we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in CompletedCheckpoint discards the state files one by one, we are seeing a very high number of remote calls. Sometimes the deletion fails to catch up with the checkpoint progress.

Given the interface we are given to configure the external storage location for checkpoints is always a `target directory`. Would it be reasonable to expose an implementation of discard() that directly calls disposeStorageLocation with recursive set to true, without iterating over each individual files first? Is there any blockers for that?

Thank you!


links 
Reply | Threaded
Open this post in threaded view
|

Re: Discard checkpoint files through a single recursive call

Piotr Nowojski-4
Hi,

Unfortunately at the moment I think there are no plans to push for this. I would suggest you to bump/cast a vote on https://issues.apache.org/jira/browse/FLINK-13856 in order to allows us more accurately prioritise efforts.

Best,
Piotrek

śr., 16 cze 2021 o 05:46 Jiahui Jiang <[hidden email]> napisał(a):
Hello Yun and Guowei,

Thanks for the context! Looks like the plan is to have a Flink config flag to enable recursive deletion? Is there any plan to push through this PR in the next release? https://github.com/apache/flink/pull/9602


Thank you so much!
Jiahui

From: Yun Tang <[hidden email]>
Sent: Tuesday, June 15, 2021 10:27 PM
To: Guowei Ma <[hidden email]>; Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Discard checkpoint files through a single recursive call
 
Hi Jiang,

Please take a look at FLINK-17860 and FLINK-13856 for previous discussion of this problem.


Best
Yun Tang


From: Guowei Ma <[hidden email]>
Sent: Wednesday, June 16, 2021 8:40
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Discard checkpoint files through a single recursive call
 
hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how you want to change it? For example, which interface or class do you want to add a method to?
Although I am not a state expert, as far as I know, due to incremental checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang <[hidden email]> wrote:
Hello Flink!

We are building an infrastructure where we implement our own CompletedCheckpointStore. The read and write to the external storage location of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job writes out a very high number of checkpoint files per checkpoint. (In our case we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in CompletedCheckpoint discards the state files one by one, we are seeing a very high number of remote calls. Sometimes the deletion fails to catch up with the checkpoint progress.

Given the interface we are given to configure the external storage location for checkpoints is always a `target directory`. Would it be reasonable to expose an implementation of discard() that directly calls disposeStorageLocation with recursive set to true, without iterating over each individual files first? Is there any blockers for that?

Thank you!


links