Questions about checkpoints/savepoints

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions about checkpoints/savepoints

vipul singh
Hello,

I have some confusion about checkpoints vs savepoints, and how to use them effectively in my application.

I am working on an application which is relies on flink's fault tolerant mechanism to ensure exactly once semantics. I have enabled external checkpointing in my application as below:

env.enableCheckpointing(CHECKPOINT_TIME_MS)

env.setStateBackend(new RocksDBStateBackend(CHECKPOINT_LOCATION))

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

Please correct me incase I am wrong but the above ensures if the application crashes, it is able to recover from the last know location. This however wont work if we cancel the application( for new deployments/restarts).

Reading link about savepoints, hints that it should a good practice to have savepoints at regular intervals of time(by crons etc) so that the application can be restarted from a last known location. This also points to using command line option( -s ) to cancel an application, so that the application stops after saving a savepoint. Based on the above understanding I have some questions below:

Questions:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
  2. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
  3. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
  4. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 

Please let me know. 

Thanks,
Vipul

Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

Stefan Richter
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan

Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

vipul singh
Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

Aljoscha Krettek
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha

On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul

Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

vipul singh
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use case.

My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.

For my application I have configured both
state.backend.fs.checkpointdir and state.checkpoints.dir

Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul




--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

Tony Wei

2017-10-24 13:07 GMT+08:00 vipul singh <[hidden email]>:
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use case.

My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.

For my application I have configured both
state.backend.fs.checkpointdir and state.checkpoints.dir

Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul




--
Thanks,
Vipul

Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

vipul singh
Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much!

I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. 

Thanks,
Vipul



On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <[hidden email]> wrote:

2017-10-24 13:07 GMT+08:00 vipul singh <[hidden email]>:
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use case.

My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.

For my application I have configured both
state.backend.fs.checkpointdir and state.checkpoints.dir

Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

Aljoscha Krettek
Hi,

That distinction with externalised checkpoints is a bit of a pitfall and I'm hoping that we can actually get rid of that distinction in the next version or the version after that. With that change, all checkpoints would always be externalised, since it's not really any noticeable overhead.

Regarding read-after-write consistency, you should be fine since an the "externalised checkpoint", i.e. the metadata, is only one file. If you know the file-path (either from the Flink dashboard or by looking at the S3 bucket) you can restore from it.

Best,
Aljoscha

On 24. Oct 2017, at 08:22, vipul singh <[hidden email]> wrote:

Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much!

I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. 

Thanks,
Vipul



On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <[hidden email]> wrote:

2017-10-24 13:07 GMT+08:00 vipul singh <[hidden email]>:
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use case.

My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.

For my application I have configured both
state.backend.fs.checkpointdir and state.checkpoints.dir

Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul

Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

vipul singh
Thanks Aljoscha for the explanations. I was able to recover from the last externalized checkpoint, by using flink run -s <metadata file> <options>

I am curious, are there any options to save the metadata file name to some other place like dynamo etc at the moment? The reason why I am asking is, 
for the end launcher code we are writing, we want to ensure if a flink job crashes, we can just start it from last known externalized checkpoint.
In the present senario, we have to list the contents of the s3 bucket which saves the metadata, to see the last metadata before failure, and there might a window where
we might run into read after write consistency of s3. Thoughts?

On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

That distinction with externalised checkpoints is a bit of a pitfall and I'm hoping that we can actually get rid of that distinction in the next version or the version after that. With that change, all checkpoints would always be externalised, since it's not really any noticeable overhead.

Regarding read-after-write consistency, you should be fine since an the "externalised checkpoint", i.e. the metadata, is only one file. If you know the file-path (either from the Flink dashboard or by looking at the S3 bucket) you can restore from it.

Best,
Aljoscha


On 24. Oct 2017, at 08:22, vipul singh <[hidden email]> wrote:

Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much!

I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. 

Thanks,
Vipul



On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <[hidden email]> wrote:

2017-10-24 13:07 GMT+08:00 vipul singh <[hidden email]>:
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use case.

My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.

For my application I have configured both
state.backend.fs.checkpointdir and state.checkpoints.dir

Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

vipul singh
As a followup to above, is there a way to get the last checkpoint metadata location inside notifyCheckpointComplete  method? I tried poking around, but didnt see a way to achieve this. Or incase there is any other way to save the actual checkpoint metadata location information into a datastore(dynamodb etc)?

We are looking to save the savepoint/externalized checkpoint metadata location in some storage space, so that we can pass this information to flink run command during recovery(thereby removing the possibility of any read after write consistency arising out of listing file paths etc).

Thanks,
Vipul

On Tue, Oct 24, 2017 at 11:53 PM, vipul singh <[hidden email]> wrote:
Thanks Aljoscha for the explanations. I was able to recover from the last externalized checkpoint, by using flink run -s <metadata file> <options>

I am curious, are there any options to save the metadata file name to some other place like dynamo etc at the moment? The reason why I am asking is, 
for the end launcher code we are writing, we want to ensure if a flink job crashes, we can just start it from last known externalized checkpoint.
In the present senario, we have to list the contents of the s3 bucket which saves the metadata, to see the last metadata before failure, and there might a window where
we might run into read after write consistency of s3. Thoughts?

On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

That distinction with externalised checkpoints is a bit of a pitfall and I'm hoping that we can actually get rid of that distinction in the next version or the version after that. With that change, all checkpoints would always be externalised, since it's not really any noticeable overhead.

Regarding read-after-write consistency, you should be fine since an the "externalised checkpoint", i.e. the metadata, is only one file. If you know the file-path (either from the Flink dashboard or by looking at the S3 bucket) you can restore from it.

Best,
Aljoscha


On 24. Oct 2017, at 08:22, vipul singh <[hidden email]> wrote:

Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much!

I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. 

Thanks,
Vipul



On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <[hidden email]> wrote:

2017-10-24 13:07 GMT+08:00 vipul singh <[hidden email]>:
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use case.

My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.

For my application I have configured both
state.backend.fs.checkpointdir and state.checkpoints.dir

Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Questions about checkpoints/savepoints

Hao Sun
Hi team, I am a similar use case do we have any answers on this?
When we trigger savepoint can we store that information to ZK as well?
So I can avoid S3 file listing and do not have to use other external services?

On Wed, Oct 25, 2017 at 11:19 PM vipul singh <[hidden email]> wrote:
As a followup to above, is there a way to get the last checkpoint metadata location inside notifyCheckpointComplete  method? I tried poking around, but didnt see a way to achieve this. Or incase there is any other way to save the actual checkpoint metadata location information into a datastore(dynamodb etc)?

We are looking to save the savepoint/externalized checkpoint metadata location in some storage space, so that we can pass this information to flink run command during recovery(thereby removing the possibility of any read after write consistency arising out of listing file paths etc).

Thanks,
Vipul

On Tue, Oct 24, 2017 at 11:53 PM, vipul singh <[hidden email]> wrote:
Thanks Aljoscha for the explanations. I was able to recover from the last externalized checkpoint, by using flink run -s <metadata file> <options>

I am curious, are there any options to save the metadata file name to some other place like dynamo etc at the moment? The reason why I am asking is, 
for the end launcher code we are writing, we want to ensure if a flink job crashes, we can just start it from last known externalized checkpoint.
In the present senario, we have to list the contents of the s3 bucket which saves the metadata, to see the last metadata before failure, and there might a window where
we might run into read after write consistency of s3. Thoughts?

On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

That distinction with externalised checkpoints is a bit of a pitfall and I'm hoping that we can actually get rid of that distinction in the next version or the version after that. With that change, all checkpoints would always be externalised, since it's not really any noticeable overhead.

Regarding read-after-write consistency, you should be fine since an the "externalised checkpoint", i.e. the metadata, is only one file. If you know the file-path (either from the Flink dashboard or by looking at the S3 bucket) you can restore from it.

Best,
Aljoscha


On 24. Oct 2017, at 08:22, vipul singh <[hidden email]> wrote:

Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much!

I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. 

Thanks,
Vipul



On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <[hidden email]> wrote:

2017-10-24 13:07 GMT+08:00 vipul singh <[hidden email]>:
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use case.

My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that state.checkpoints.dir is responsible for storing metadata information, and links to data files in state.backend.fs.checkpointdir.

For my application I have configured both
state.backend.fs.checkpointdir and state.checkpoints.dir

Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointdir and state.checkpoints.dir, values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Flink does not rely on file system operations to list contents, all necessary file paths are stored in the meta data file, as you guessed. This is the reason savepoints also work with file systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <[hidden email]> wrote:

Thanks Stefan for the answers above. These are really helpful.

I have a few followup questions:
  1. I see my savepoints are created in a folder, which has a _metadata file and another file. Looking at the code it seems like the metadata file contains tasks states, operator state and master states. What is the purpose of the other file in the savepoint folder? My guess is it should be a checkpoint file? 
  2. I am planning to use s3 as my state backend, so want to ensure that application restarts are not affected by read-after-write consistency of s3( if I use s3 as a savepoint backend). I am curious how flink restores data from the _metadata file, and the other file? Does the _metadata file contain path to these other files? or would it do a listing on the s3 folder?

Please let me know,

Thanks,
Vipul

On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that checkpoints can be treated as flink internal recovery mechanism, and savepoints act more as user-defined recovery points. Would that be a correct assumption?
You could see it that way, but I would describe savepoints more as user-defined *restart* points than *recovery* points. Please take a look at my answers in this thread, because they cover most of your question:


  1. While cancelling an application with -s option, it specifies the savepoint location. Is there a way during application startup to identify the last know savepoint from a folder by itself, and restart from there. Since I am saving my savepoints on s3, I want to avoid issues arising from ls command on s3 due to read-after-write consistency of s3.
I don’t think that this feature exists, you have to specify the savepoint.

  1. Suppose my application has a checkpoint at point t1, and say i cancel this application sometime in future before the next available checkpoint( say t1+x). If I start the application without specifying the savepoint, it will start from the last known checkpoint(at t1), which wont have the application state saved, since I had cancelled the application. Would this is a correct assumption?
If you restart a canceled application it will not consider checkpoints. They are only considered in recovery on failure. You need to specify a savepoint or externalized checkpoint for restarts to make explicit that you intend to restart a job, and not to run a new instance of the job.

  1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as manually saving regular savepoints? 
Not the same, because checkpoints and savepoints are different in certain aspects, but both methods leave you with something that survives job cancelation and can be used to restart from a certain state.

Best,
Stefan




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul




--
Thanks,
Vipul



--
Thanks,
Vipul