Need Clarity about Checkpoint for Flink-1.12.2

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Need Clarity about Checkpoint for Flink-1.12.2

sudhansu069
Hi Team,

We have recently enabled Check Pointing in our flink job using FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling check pointing though app code and we are using flink version - 1.12.2 .

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

The query is , do we still need to set the below config in flink-conf.yaml for checkpointing to work.

state.checkpoints.dir: s3://prod-flink-checkpointing/checkpoint-metadata/


Thanks,
Sudhansu
Reply | Threaded
Open this post in threaded view
|

Re: Need Clarity about Checkpoint for Flink-1.12.2

Guowei Ma
Hi Sudhansu,
I think you do not need to set the config in flink-conf.
Best,
Guowei


On Thu, May 13, 2021 at 1:06 PM sudhansu jena <[hidden email]> wrote:
Hi Team,

We have recently enabled Check Pointing in our flink job using FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling check pointing though app code and we are using flink version - 1.12.2 .

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

The query is , do we still need to set the below config in flink-conf.yaml for checkpointing to work.

state.checkpoints.dir: s3://prod-flink-checkpointing/checkpoint-metadata/


Thanks,
Sudhansu