Unused Checkpointed folder in S3

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Unused Checkpointed folder in S3

sudhansu069
Hi Team,

We have recently enabled Check Pointing in our flink job using FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling the checkpointing for the job. The query is each time we cancel the job and restart from the flink dashboard, a new folder is getting created along with the old checkpointing folder in the S3 bucket, So is there a way to get rid of these old checkpointed folders automatically assuming they are not gonna be used for restoring the state except the latest folder? 

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);


Thanks,
Sudhansu
Reply | Threaded
Open this post in threaded view
|

Re: Unused Checkpointed folder in S3

Arvid Heise-4
Hi Sudhansu,

if you don't set RETAIN_ON_CANCELLATION, the folder should be cleaned up automatically. If you explicitly want to retain the checkpoint, then there is not much that Flink can do or I may have misunderstood you.

On Tue, May 11, 2021 at 4:09 PM sudhansu jena <[hidden email]> wrote:
Hi Team,

We have recently enabled Check Pointing in our flink job using FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling the checkpointing for the job. The query is each time we cancel the job and restart from the flink dashboard, a new folder is getting created along with the old checkpointing folder in the S3 bucket, So is there a way to get rid of these old checkpointed folders automatically assuming they are not gonna be used for restoring the state except the latest folder? 

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);


Thanks,
Sudhansu
Reply | Threaded
Open this post in threaded view
|

Re: Unused Checkpointed folder in S3

Chesnay Schepler
In reply to this post by sudhansu069
Which Flink version are you using?

On 5/11/2021 4:09 PM, sudhansu jena wrote:

> Hi Team,
>
> We have recently enabled Check Pointing in our flink job using
> FSStateBackend pointing to S3 bucket.
>
> Below is the sample code for enabling the checkpointing for the job.
> The query is each time we cancel the job and restart from the flink
> dashboard, a new folder is getting created along with the old
> checkpointing folder in the S3 bucket, So is there a way to get rid of
> these old checkpointed folders automatically assuming they are not
> gonna be used for restoring the state except the latest folder?
>
> env.setStateBackend(new
> FsStateBackend("s3://flinkcheckpointing/job-name/",true));
> env.enableCheckpointing(1000);
> Class<?> unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);
> CheckpointConfig config = env.getCheckpointConfig();
> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>
>
> Thanks,
> Sudhansu


Reply | Threaded
Open this post in threaded view
|

Re: Unused Checkpointed folder in S3

sudhansu069
The flink version we are using is 1.12.2


Thanks,
Sudhansu

On Tue, May 11, 2021 at 7:48 PM Chesnay Schepler <[hidden email]> wrote:
Which Flink version are you using?

On 5/11/2021 4:09 PM, sudhansu jena wrote:
> Hi Team,
>
> We have recently enabled Check Pointing in our flink job using
> FSStateBackend pointing to S3 bucket.
>
> Below is the sample code for enabling the checkpointing for the job.
> The query is each time we cancel the job and restart from the flink
> dashboard, a new folder is getting created along with the old
> checkpointing folder in the S3 bucket, So is there a way to get rid of
> these old checkpointed folders automatically assuming they are not
> gonna be used for restoring the state except the latest folder?
>
> env.setStateBackend(new
> FsStateBackend("s3://flinkcheckpointing/job-name/",true));
> env.enableCheckpointing(1000);
> Class<?> unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);
> CheckpointConfig config = env.getCheckpointConfig();
> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>
>
> Thanks,
> Sudhansu


Reply | Threaded
Open this post in threaded view
|

Re: Unused Checkpointed folder in S3

sudhansu069
In reply to this post by Arvid Heise-4
Hi Arvid,


The flink document says,

"ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails."

If I have to resume the program after cancellation, Do I have to use ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION or I have to use a save point ? If I have to use a save point, then what is the use of ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ?

Thanks,
Sudhansu

On Tue, May 11, 2021 at 7:47 PM Arvid Heise <[hidden email]> wrote:
Hi Sudhansu,

if you don't set RETAIN_ON_CANCELLATION, the folder should be cleaned up automatically. If you explicitly want to retain the checkpoint, then there is not much that Flink can do or I may have misunderstood you.

On Tue, May 11, 2021 at 4:09 PM sudhansu jena <[hidden email]> wrote:
Hi Team,

We have recently enabled Check Pointing in our flink job using FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling the checkpointing for the job. The query is each time we cancel the job and restart from the flink dashboard, a new folder is getting created along with the old checkpointing folder in the S3 bucket, So is there a way to get rid of these old checkpointed folders automatically assuming they are not gonna be used for restoring the state except the latest folder? 

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);


Thanks,
Sudhansu
Reply | Threaded
Open this post in threaded view
|

Re: Unused Checkpointed folder in S3

Arvid Heise-4
Hi Sudhansu,

the usual flow is that if you cancel a job, you have 2 choices:
- Either you want to later resume the job, then you use stop-with-savepoint and resume from savepoint. That would allow you to upgrade the job or the Flink version. However, taking a final savepoint takes some time.
- Or you want to quickly cancel the job and not resume later, then you just cancel.
In both cases, by default Flink would clean up the checkpoint directory.

Now for certain power-users with large state, using savepoints is not an option: it can take a few hours to savepoint TBs of state while checkpoints take a few minutes. For these users, we added RETAIN_ON_CANCELLATION, so that they can use checkpoints in the same way, we envisioned savepoints to be used. It's also the fastest option to rescale your job.

We would recommend sticking with savepoint for all manual actions until you hit checkpointing/recovery times that are too high for you. Checkpoints would only be used for recovery in a failure.

Note that we are aware that the current state is confusing and we are planning to improve the whole checkpoint/savepoint story and make it simpler for users in the second half of the year.

On Tue, May 11, 2021 at 4:45 PM sudhansu jena <[hidden email]> wrote:
Hi Arvid,


The flink document says,

"ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails."

If I have to resume the program after cancellation, Do I have to use ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION or I have to use a save point ? If I have to use a save point, then what is the use of ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ?

Thanks,
Sudhansu

On Tue, May 11, 2021 at 7:47 PM Arvid Heise <[hidden email]> wrote:
Hi Sudhansu,

if you don't set RETAIN_ON_CANCELLATION, the folder should be cleaned up automatically. If you explicitly want to retain the checkpoint, then there is not much that Flink can do or I may have misunderstood you.

On Tue, May 11, 2021 at 4:09 PM sudhansu jena <[hidden email]> wrote:
Hi Team,

We have recently enabled Check Pointing in our flink job using FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling the checkpointing for the job. The query is each time we cancel the job and restart from the flink dashboard, a new folder is getting created along with the old checkpointing folder in the S3 bucket, So is there a way to get rid of these old checkpointed folders automatically assuming they are not gonna be used for restoring the state except the latest folder? 

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);


Thanks,
Sudhansu