Hi - I have some questions regarding Flink's checkpointing, specifically related to storing state in the backends.
So let's say an operator in a streaming job is building up some state. When it receives barriers from all of its input streams, does it store *all* of its state to the backend? I think that is what the docs [1] and paper [2] imply, but want to make sure. In other words, if the operator contains 100MB of state, and the backend is HDFS, does the operator copy all 100MB of state to HDFS during the checkpoint? Following on this example, say the operator is a global window and is storing some state for each unique key observed in the stream of messages (e.g. userId). Assume that over time, the number of observed unique keys grows, so the size of the state also grows (the window state is never purged). Is the entire operator state at the time of each checkpoint stored to the backend? So that over time, the size of the state stored for each checkpoint to the backend grows? Or is the state stored to the backend somehow just the state that changed in some way since the last checkpoint? Are old checkpoint states in the backend ever deleted / cleaned up? That is, if all of the state for checkpoint n in the backend is all that is needed to restore a failed job, then all state for all checkpoints m < n should not be needed any more, right? Can all of those old checkpoints be deleted from the backend? Does Flink do this? Thanks, Zach |
Hi Zach,
some answers/comments inline. Cheers Konstantin On 05.04.2016 20:39, Zach Cox wrote: > Hi - I have some questions regarding Flink's checkpointing, specifically > related to storing state in the backends. > > So let's say an operator in a streaming job is building up some state. > When it receives barriers from all of its input streams, does it store > *all* of its state to the backend? I think that is what the docs [1] and > paper [2] imply, but want to make sure. In other words, if the operator > contains 100MB of state, and the backend is HDFS, does the operator copy > all 100MB of state to HDFS during the checkpoint? Yes. With the filesystem backend this happens synchronously, with RocksDB backend the transfer to HDFS is asynchronous. > Following on this example, say the operator is a global window and is > storing some state for each unique key observed in the stream of > messages (e.g. userId). Assume that over time, the number of observed > unique keys grows, so the size of the state also grows (the window state > is never purged). Is the entire operator state at the time of each > checkpoint stored to the backend? So that over time, the size of the > state stored for each checkpoint to the backend grows? Or is the state > stored to the backend somehow just the state that changed in some way > since the last checkpoint? The complete state is checkpointed. Incremental backups are currently not supported, but seem to be on the roadmap. > Are old checkpoint states in the backend ever deleted / cleaned up? That > is, if all of the state for checkpoint n in the backend is all that is > needed to restore a failed job, then all state for all checkpoints m < n > should not be needed any more, right? Can all of those old checkpoints > be deleted from the backend? Does Flink do this? To my knowledge flink takes care of deleting old checkpoints (I think it says so in the documentation about savepoints.). In my experience though, if a job is cancelled or crashes, the checkpoint files are usually not cleaned up. So some housekeeping might be necessary. > Thanks, > Zach > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html > [2] http://arxiv.org/abs/1506.08603 > -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Hey Zach and Konstantin,
Great questions and answers. We can try to make this more explicit in the docs. On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf <[hidden email]> wrote: > To my knowledge flink takes care of deleting old checkpoints (I think it > says so in the documentation about savepoints.). In my experience > though, if a job is cancelled or crashes, the checkpoint files are > usually not cleaned up. So some housekeeping might be necessary. Regarding cleanup: currently only the latest successful checkpoint is retained. On graceful shutdown, all checkpoints should be cleaned up as far as I know. Savepoints always have to be cleaned up manually. On crashes, the checkpoint state has to be cleaned up manually (if the JVM shut down hooks did not run). @Konstantin: did you have lingering state without crashes? – Ufuk |
Hi Ufuk,
I thought so, but I am not sure when and where ;) I will let you know, if I come across it again. Cheers, Konstantin On 05.04.2016 21:10, Ufuk Celebi wrote: > Hey Zach and Konstantin, > > Great questions and answers. We can try to make this more explicit in the docs. > > On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf > <[hidden email]> wrote: >> To my knowledge flink takes care of deleting old checkpoints (I think it >> says so in the documentation about savepoints.). In my experience >> though, if a job is cancelled or crashes, the checkpoint files are >> usually not cleaned up. So some housekeeping might be necessary. > > Regarding cleanup: currently only the latest successful checkpoint is retained. > > On graceful shutdown, all checkpoints should be cleaned up as far as I > know. Savepoints always have to be cleaned up manually. > > On crashes, the checkpoint state has to be cleaned up manually (if the > JVM shut down hooks did not run). > > @Konstantin: did you have lingering state without crashes? > > – Ufuk > -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Thanks for the details Konstantin and Ufuk! On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <[hidden email]> wrote: Hi Ufuk, |
Hi Zach!
I am working on incremental checkpointing, hope to have it in the master in the next weeks. The current approach is a to have a full self-contained checkpoint every once in a while, and have incremental checkpoints most of the time. Having a full checkpoint every now and then spares you from re-applying an endless set of deltas on recovery. Related to that is also making the checkpointing asynchronous, so that normal operations do not see any disruption any more. Greetings, Stephan On Tue, Apr 5, 2016 at 10:25 PM, Zach Cox <[hidden email]> wrote:
|
Hi Stephan - incremental checkpointing sounds really interesting and useful, I look forward to trying it out. Thanks, Zach On Wed, Apr 6, 2016 at 4:39 AM Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |