Shared Checkpoint Cleanup and S3 Lifecycle Policy

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Shared Checkpoint Cleanup and S3 Lifecycle Policy

Trystan
Hello!

Recently we ran into an issue when checkpointing to S3. Because S3 ratelimits based on prefix, the /shared directory would get slammed and cause S3 throttling. There is no solution for this, because /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.


Jobs sometimes also completely crash, and they leave state laying around when we bring the job up fresh.

Our solutions have been to 1) reduce the number of taskmanagers 2) reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had increased it to speed up checkpointing/savepoint) and 3) manually delete tons of old files in the shared directory.

My question:
Can we safely apply a Lifecycle Policy to the directory/bucket to remove things? How long is stuff under /shared retained? Is it only for the duration of the oldest checkpoint, or could it carry forward, untouched, from the very first checkpoint to the very last? This shared checkpoint dir/prefix is currently limiting some scalability of our jobs. I don't believe the _entropy_ trick would help this, because the issue is ultimately that there's a single shared directory.

Thank you!
Trystan
Reply | Threaded
Open this post in threaded view
|

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Congxian Qiu
Hi
For the rate limit, could you please try entropy injection[1].
For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid.


Trystan <[hidden email]> 于2020年5月7日周四 上午2:46写道:
Hello!

Recently we ran into an issue when checkpointing to S3. Because S3 ratelimits based on prefix, the /shared directory would get slammed and cause S3 throttling. There is no solution for this, because /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.


Jobs sometimes also completely crash, and they leave state laying around when we bring the job up fresh.

Our solutions have been to 1) reduce the number of taskmanagers 2) reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had increased it to speed up checkpointing/savepoint) and 3) manually delete tons of old files in the shared directory.

My question:
Can we safely apply a Lifecycle Policy to the directory/bucket to remove things? How long is stuff under /shared retained? Is it only for the duration of the oldest checkpoint, or could it carry forward, untouched, from the very first checkpoint to the very last? This shared checkpoint dir/prefix is currently limiting some scalability of our jobs. I don't believe the _entropy_ trick would help this, because the issue is ultimately that there's a single shared directory.

Thank you!
Trystan
Reply | Threaded
Open this post in threaded view
|

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Trystan
Thanks Congxian! To make sure I'm understanding correctly, if I retain 3 incremental checkpoints (say every minute), and I've just completed checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So anything older than ~3 minutes can safely be deleted? The state from checkpoint 5 doesn't live on in the shared directory - at all?

I ask because we have run into cases where we end up abandoning the state, and Flink does not clean up state from, say, a previous iteration of the job if you don't restore state. We need to remove these files automatically, but I want to be sure that I don't blow away older files in the shared dir from earlier, subsumed checkpoints - but you are saying that isn't possible, and that all subsumed checkpoints will have their /shared state rewritten or cleaned up as needed, correct?

As for entropy, where would you suggest to use it? My understanding is that I don't control anything beyond the checkpoint directory, and since shared is in that directory I can't put entropy inside the shared directory itself (which is what I would need).

Thanks,
Trystan

On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <[hidden email]> wrote:
Hi
For the rate limit, could you please try entropy injection[1].
For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid.


Trystan <[hidden email]> 于2020年5月7日周四 上午2:46写道:
Hello!

Recently we ran into an issue when checkpointing to S3. Because S3 ratelimits based on prefix, the /shared directory would get slammed and cause S3 throttling. There is no solution for this, because /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.


Jobs sometimes also completely crash, and they leave state laying around when we bring the job up fresh.

Our solutions have been to 1) reduce the number of taskmanagers 2) reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had increased it to speed up checkpointing/savepoint) and 3) manually delete tons of old files in the shared directory.

My question:
Can we safely apply a Lifecycle Policy to the directory/bucket to remove things? How long is stuff under /shared retained? Is it only for the duration of the oldest checkpoint, or could it carry forward, untouched, from the very first checkpoint to the very last? This shared checkpoint dir/prefix is currently limiting some scalability of our jobs. I don't believe the _entropy_ trick would help this, because the issue is ultimately that there's a single shared directory.

Thank you!
Trystan
Reply | Threaded
Open this post in threaded view
|

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Congxian Qiu
Hi

Yes, there should only files used in checkpoint 8 and 9 and 10 in the checkpoint file, but you can not delete the file which created older than 3 minutes(because checkpoint 8,9, 10 may reuse the file created in the previous checkpoint, this is the how incremental checkpoint works[1])

you can also check the directory of checkpoint files[2] for more information, copied from the website here: 
> The SHARED directory is for state that is possibly part of multiple checkpoints, TASKOWNED is for state that must never be dropped by the JobManager, and EXCLUSIVE is for state that belongs to one checkpoint only.

For the entropy injection, you can enable it as the documentation said, it will replace the entropy_key with some random strings with the specified length so that the files are not all in the same directory.


Trystan <[hidden email]> 于2020年5月7日周四 下午12:54写道:
Thanks Congxian! To make sure I'm understanding correctly, if I retain 3 incremental checkpoints (say every minute), and I've just completed checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So anything older than ~3 minutes can safely be deleted? The state from checkpoint 5 doesn't live on in the shared directory - at all?

I ask because we have run into cases where we end up abandoning the state, and Flink does not clean up state from, say, a previous iteration of the job if you don't restore state. We need to remove these files automatically, but I want to be sure that I don't blow away older files in the shared dir from earlier, subsumed checkpoints - but you are saying that isn't possible, and that all subsumed checkpoints will have their /shared state rewritten or cleaned up as needed, correct?

As for entropy, where would you suggest to use it? My understanding is that I don't control anything beyond the checkpoint directory, and since shared is in that directory I can't put entropy inside the shared directory itself (which is what I would need).

Thanks,
Trystan

On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <[hidden email]> wrote:
Hi
For the rate limit, could you please try entropy injection[1].
For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid.


Trystan <[hidden email]> 于2020年5月7日周四 上午2:46写道:
Hello!

Recently we ran into an issue when checkpointing to S3. Because S3 ratelimits based on prefix, the /shared directory would get slammed and cause S3 throttling. There is no solution for this, because /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.


Jobs sometimes also completely crash, and they leave state laying around when we bring the job up fresh.

Our solutions have been to 1) reduce the number of taskmanagers 2) reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had increased it to speed up checkpointing/savepoint) and 3) manually delete tons of old files in the shared directory.

My question:
Can we safely apply a Lifecycle Policy to the directory/bucket to remove things? How long is stuff under /shared retained? Is it only for the duration of the oldest checkpoint, or could it carry forward, untouched, from the very first checkpoint to the very last? This shared checkpoint dir/prefix is currently limiting some scalability of our jobs. I don't believe the _entropy_ trick would help this, because the issue is ultimately that there's a single shared directory.

Thank you!
Trystan
Reply | Threaded
Open this post in threaded view
|

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Trystan
Aha, so incremental checkpointing does rely on infinitely-previous checkpoint state, regardless of the incremental retention number. The documentation wasn't entirely clear about this. One would assume that if you retain 3 checkpoints, anything older than the 3rd is irrelevant, but that's evidently not true. So it is never safe to delete any files in /shared, because we can't know which files belong to the current job (and may have lived on from checkpoint 1 even though we're on checkpoint 10 and only "retain" 3) and which ones have been abandoned altogether (due to a previous run of the job where we didn't restore state).

This is really unfortunate - it can lead to a case where you accumulate a huge number of files in S3 and you can't know when ones to delete, especially if the job id remains the same (for job mode, they're all zeros). So this shared state lives on forever and there is no way to ever clean it up, at all. I am surprised that this hasn't been a problem for anyone else. Maybe I should just file a feature request for this, at least to find some solution for ways to clean up these directories.

I appreciate your patience and help, thank you so much!

Trystan

On Thu, May 7, 2020 at 7:15 PM Congxian Qiu <[hidden email]> wrote:
Hi

Yes, there should only files used in checkpoint 8 and 9 and 10 in the checkpoint file, but you can not delete the file which created older than 3 minutes(because checkpoint 8,9, 10 may reuse the file created in the previous checkpoint, this is the how incremental checkpoint works[1])

you can also check the directory of checkpoint files[2] for more information, copied from the website here: 
> The SHARED directory is for state that is possibly part of multiple checkpoints, TASKOWNED is for state that must never be dropped by the JobManager, and EXCLUSIVE is for state that belongs to one checkpoint only.

For the entropy injection, you can enable it as the documentation said, it will replace the entropy_key with some random strings with the specified length so that the files are not all in the same directory.


Trystan <[hidden email]> 于2020年5月7日周四 下午12:54写道:
Thanks Congxian! To make sure I'm understanding correctly, if I retain 3 incremental checkpoints (say every minute), and I've just completed checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So anything older than ~3 minutes can safely be deleted? The state from checkpoint 5 doesn't live on in the shared directory - at all?

I ask because we have run into cases where we end up abandoning the state, and Flink does not clean up state from, say, a previous iteration of the job if you don't restore state. We need to remove these files automatically, but I want to be sure that I don't blow away older files in the shared dir from earlier, subsumed checkpoints - but you are saying that isn't possible, and that all subsumed checkpoints will have their /shared state rewritten or cleaned up as needed, correct?

As for entropy, where would you suggest to use it? My understanding is that I don't control anything beyond the checkpoint directory, and since shared is in that directory I can't put entropy inside the shared directory itself (which is what I would need).

Thanks,
Trystan

On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <[hidden email]> wrote:
Hi
For the rate limit, could you please try entropy injection[1].
For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid.


Trystan <[hidden email]> 于2020年5月7日周四 上午2:46写道:
Hello!

Recently we ran into an issue when checkpointing to S3. Because S3 ratelimits based on prefix, the /shared directory would get slammed and cause S3 throttling. There is no solution for this, because /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.


Jobs sometimes also completely crash, and they leave state laying around when we bring the job up fresh.

Our solutions have been to 1) reduce the number of taskmanagers 2) reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had increased it to speed up checkpointing/savepoint) and 3) manually delete tons of old files in the shared directory.

My question:
Can we safely apply a Lifecycle Policy to the directory/bucket to remove things? How long is stuff under /shared retained? Is it only for the duration of the oldest checkpoint, or could it carry forward, untouched, from the very first checkpoint to the very last? This shared checkpoint dir/prefix is currently limiting some scalability of our jobs. I don't believe the _entropy_ trick would help this, because the issue is ultimately that there's a single shared directory.

Thank you!
Trystan
Reply | Threaded
Open this post in threaded view
|

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Congxian Qiu
Hi

Currently, it is hard to determine which files can be deleted safely in the shared folder, the ground truth is in the checkpoint metafile. I've created an issue[1] for such a feature


Trystan <[hidden email]> 于2020年5月8日周五 下午1:05写道:
Aha, so incremental checkpointing does rely on infinitely-previous checkpoint state, regardless of the incremental retention number. The documentation wasn't entirely clear about this. One would assume that if you retain 3 checkpoints, anything older than the 3rd is irrelevant, but that's evidently not true. So it is never safe to delete any files in /shared, because we can't know which files belong to the current job (and may have lived on from checkpoint 1 even though we're on checkpoint 10 and only "retain" 3) and which ones have been abandoned altogether (due to a previous run of the job where we didn't restore state).

This is really unfortunate - it can lead to a case where you accumulate a huge number of files in S3 and you can't know when ones to delete, especially if the job id remains the same (for job mode, they're all zeros). So this shared state lives on forever and there is no way to ever clean it up, at all. I am surprised that this hasn't been a problem for anyone else. Maybe I should just file a feature request for this, at least to find some solution for ways to clean up these directories.

I appreciate your patience and help, thank you so much!

Trystan

On Thu, May 7, 2020 at 7:15 PM Congxian Qiu <[hidden email]> wrote:
Hi

Yes, there should only files used in checkpoint 8 and 9 and 10 in the checkpoint file, but you can not delete the file which created older than 3 minutes(because checkpoint 8,9, 10 may reuse the file created in the previous checkpoint, this is the how incremental checkpoint works[1])

you can also check the directory of checkpoint files[2] for more information, copied from the website here: 
> The SHARED directory is for state that is possibly part of multiple checkpoints, TASKOWNED is for state that must never be dropped by the JobManager, and EXCLUSIVE is for state that belongs to one checkpoint only.

For the entropy injection, you can enable it as the documentation said, it will replace the entropy_key with some random strings with the specified length so that the files are not all in the same directory.


Trystan <[hidden email]> 于2020年5月7日周四 下午12:54写道:
Thanks Congxian! To make sure I'm understanding correctly, if I retain 3 incremental checkpoints (say every minute), and I've just completed checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So anything older than ~3 minutes can safely be deleted? The state from checkpoint 5 doesn't live on in the shared directory - at all?

I ask because we have run into cases where we end up abandoning the state, and Flink does not clean up state from, say, a previous iteration of the job if you don't restore state. We need to remove these files automatically, but I want to be sure that I don't blow away older files in the shared dir from earlier, subsumed checkpoints - but you are saying that isn't possible, and that all subsumed checkpoints will have their /shared state rewritten or cleaned up as needed, correct?

As for entropy, where would you suggest to use it? My understanding is that I don't control anything beyond the checkpoint directory, and since shared is in that directory I can't put entropy inside the shared directory itself (which is what I would need).

Thanks,
Trystan

On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <[hidden email]> wrote:
Hi
For the rate limit, could you please try entropy injection[1].
For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid.


Trystan <[hidden email]> 于2020年5月7日周四 上午2:46写道:
Hello!

Recently we ran into an issue when checkpointing to S3. Because S3 ratelimits based on prefix, the /shared directory would get slammed and cause S3 throttling. There is no solution for this, because /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.


Jobs sometimes also completely crash, and they leave state laying around when we bring the job up fresh.

Our solutions have been to 1) reduce the number of taskmanagers 2) reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had increased it to speed up checkpointing/savepoint) and 3) manually delete tons of old files in the shared directory.

My question:
Can we safely apply a Lifecycle Policy to the directory/bucket to remove things? How long is stuff under /shared retained? Is it only for the duration of the oldest checkpoint, or could it carry forward, untouched, from the very first checkpoint to the very last? This shared checkpoint dir/prefix is currently limiting some scalability of our jobs. I don't believe the _entropy_ trick would help this, because the issue is ultimately that there's a single shared directory.

Thank you!
Trystan