Hi community,
We are currently using Externalized Checkpoints to prevent abrupt YARN application failures, as it saves a "_metadata" file within the checkpoint folder which is essential for the job's cold recovery.
As it is designed in Flink, the completed checkpoint paths are like hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1895, hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1896, hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1897 ... and so on.
Originally, we have deployed a tool to periodically request the REST API of all running Flink jobs to get the latest completed checkpoint paths, and save them on database for later use. However. the periodic scan frequency might be lower than the pace that checkpoints are deleted, thus in case of recovery, the saved directory might have already been deleted and replaced by new ones.
Here we wonder that if we could just try to list the parent checkpoint folder (say hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/) and choose the "chk-XXXX" directory with the highest XXXX number as the checkpoint for job recovery.
But there is one concern to address: how can we make sure that the highest "chk-XXXX" folder is indeed a complete one (after digging through the code in FsCheckpointMetadataOutputStream class, there might be a chance that the file is halfway uploaded but later failed to be removed due to exceptions or sudden JVM crashes).
Another approach that we come up with is to write a callback to notify us once a checkpoint is completed via CheckpointListeners like AcknowledgeOnCheckpoint, however, it is also possible that the notification message never reaches the server before JVM crashes.
What do you think is the right and idiomatic way to get the last successfully completed externalized checkpoint path to prevent sudden JVM crashes? Thank you very much : )