Controlling the amount of checkpoint files

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Controlling the amount of checkpoint files

Boris Lublinsky
Is there a way to limit the amount of checkpoint file?
The parameter that I set : state.checkpoints.num-retained: 5
does not seem to have any effect. Is there anything else I can set to prevent infinite growth of checkpointing info?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

Reply | Threaded
Open this post in threaded view
|

Re: Controlling the amount of checkpoint files

Congxian Qiu
Hi Boris
For the configure you gave, you can try to reduce the parallelism of the operator which contains states.

Best,
Congxian


Boris Lublinsky <[hidden email]> 于2019年6月10日周一 下午9:43写道:
Here is code enabling checkpointing

// Enable checkpointing
env.enableCheckpointing(60000 ) // 1 min
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", true)
env.setStateBackend(checkpointingBackend)


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Jun 10, 2019, at 1:07 AM, Congxian Qiu <[hidden email]> wrote:

Hi

Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or increment) do you use? 

Best,
Congxian


Boris Lublinsky <[hidden email]> 于2019年6月4日周二 上午6:45写道:
Is there a way to limit the amount of checkpoint file?
The parameter that I set : state.checkpoints.num-retained: 5
does not seem to have any effect. Is there anything else I can set to prevent infinite growth of checkpointing info?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/


Reply | Threaded
Open this post in threaded view
|

Re: Controlling the amount of checkpoint files

rmetzger0
Hey Boris,

I think the problem is that you are using externalized checkpoints:
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
Your checkpoints are retained in both failure and cancellation cases, so the checkpoint files with grow indefinitely



On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu <[hidden email]> wrote:
Hi Boris
For the configure you gave, you can try to reduce the parallelism of the operator which contains states.

Best,
Congxian


Boris Lublinsky <[hidden email]> 于2019年6月10日周一 下午9:43写道:
Here is code enabling checkpointing

// Enable checkpointing
env.enableCheckpointing(60000 ) // 1 min
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", true)
env.setStateBackend(checkpointingBackend)


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Jun 10, 2019, at 1:07 AM, Congxian Qiu <[hidden email]> wrote:

Hi

Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or increment) do you use? 

Best,
Congxian


Boris Lublinsky <[hidden email]> 于2019年6月4日周二 上午6:45写道:
Is there a way to limit the amount of checkpoint file?
The parameter that I set : state.checkpoints.num-retained: 5
does not seem to have any effect. Is there anything else I can set to prevent infinite growth of checkpointing info?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/


Reply | Threaded
Open this post in threaded view
|

Re: Controlling the amount of checkpoint files

Boris Lublinsky
So if you have externalized checkpoints, they are never purged?
The issue is that if your state size is rather large, this seems to be the only option.

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Jun 15, 2019, at 3:39 AM, Robert Metzger <[hidden email]> wrote:

Hey Boris,

I think the problem is that you are using externalized checkpoints:
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
Your checkpoints are retained in both failure and cancellation cases, so the checkpoint files with grow indefinitely



On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu <[hidden email]> wrote:
Hi Boris
For the configure you gave, you can try to reduce the parallelism of the operator which contains states.

Best,
Congxian


Boris Lublinsky <[hidden email]> 于2019年6月10日周一 下午9:43写道:
Here is code enabling checkpointing

// Enable checkpointing
env.enableCheckpointing(60000 ) // 1 min
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", true)
env.setStateBackend(checkpointingBackend)


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Jun 10, 2019, at 1:07 AM, Congxian Qiu <[hidden email]> wrote:

Hi

Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or increment) do you use? 

Best,
Congxian


Boris Lublinsky <[hidden email]> 于2019年6月4日周二 上午6:45写道:
Is there a way to limit the amount of checkpoint file?
The parameter that I set : state.checkpoints.num-retained: 5
does not seem to have any effect. Is there anything else I can set to prevent infinite growth of checkpointing info?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/