Automatically restore from checkpoint

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Automatically restore from checkpoint

Arpith P
Hi,

I'm running Flink job in distributed mode deployed in Yarn; I've enabled externalized checkpoint to save in Hdfs, but I don't have access to read checkpoints folder. To restart Flink job from the last saved checkpoint is it possible to do without passing "-s :checkpointPath". If this is not possible how can I restore states after the job gets crashed. If enabling JobManager HA would help me in anyway.

Thanks,
Arpith
Reply | Threaded
Open this post in threaded view
|

Re: Automatically restore from checkpoint

David Anderson-3
If your job crashes, Flink will automatically restart from the latest checkpoint, without any manual intervention. JobManager HA is only needed for automatic recovery after the failure of the Job Manager.

You only need externalized checkpoints and "-s :checkpointPath" if you want to use checkpoints to manually restart a job after manually cancelling or stopping it. Also, it's not necessary that you have read access to the checkpoints, but the job manager and task managers must be able to read (and write) them.

Regards,
David

On Fri, Sep 18, 2020 at 6:23 AM Arpith P <[hidden email]> wrote:
Hi,

I'm running Flink job in distributed mode deployed in Yarn; I've enabled externalized checkpoint to save in Hdfs, but I don't have access to read checkpoints folder. To restart Flink job from the last saved checkpoint is it possible to do without passing "-s :checkpointPath". If this is not possible how can I restore states after the job gets crashed. If enabling JobManager HA would help me in anyway.

Thanks,
Arpith
Reply | Threaded
Open this post in threaded view
|

Re: Automatically restore from checkpoint

Arpith P
Thanks David, in case of manual restart; to get checkpoint path programmatically I'm using the following code to retrieve JobId and CheckpointID so i could pass along while restarting with "-s" but seems I'm missing something as I'm getting empty TimestampedFileSplit array.
GlobFilePathFilter filePathFilter = new GlobFilePathFilter(
Collections.singletonList("[0-9a-fA-F]{32}/chk-[\\d]+"),
Collections.<String>emptyList());
TextInputFormat inputFormat = new TextInputFormat(
new org.apache.flink.core.fs.Path(inputFolderPath));
inputFormat.setNestedFileEnumeration(true);
inputFormat.setFilesFilter(filePathFilter);

ContinuousFileMonitoringFunction<String> monitoringFunction = new ContinuousFileMonitoringFunction<>(
inputFormat,
FileProcessingMode.PROCESS_CONTINUOUSLY,
inputFolderParallelism,
pollInterval);

DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);
splits.addSink(new PrintSinkFunction<>());

Arpith

On Fri, Sep 18, 2020 at 2:09 PM David Anderson <[hidden email]> wrote:
If your job crashes, Flink will automatically restart from the latest checkpoint, without any manual intervention. JobManager HA is only needed for automatic recovery after the failure of the Job Manager.

You only need externalized checkpoints and "-s :checkpointPath" if you want to use checkpoints to manually restart a job after manually cancelling or stopping it. Also, it's not necessary that you have read access to the checkpoints, but the job manager and task managers must be able to read (and write) them.

Regards,
David

On Fri, Sep 18, 2020 at 6:23 AM Arpith P <[hidden email]> wrote:
Hi,

I'm running Flink job in distributed mode deployed in Yarn; I've enabled externalized checkpoint to save in Hdfs, but I don't have access to read checkpoints folder. To restart Flink job from the last saved checkpoint is it possible to do without passing "-s :checkpointPath". If this is not possible how can I restore states after the job gets crashed. If enabling JobManager HA would help me in anyway.

Thanks,
Arpith