Checkpoint state stored in backend, and deleting old checkpoint state

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint state stored in backend, and deleting old checkpoint state

Zach Cox
Hi - I have some questions regarding Flink's checkpointing, specifically related to storing state in the backends.

So let's say an operator in a streaming job is building up some state. When it receives barriers from all of its input streams, does it store *all* of its state to the backend? I think that is what the docs [1] and paper [2] imply, but want to make sure. In other words, if the operator contains 100MB of state, and the backend is HDFS, does the operator copy all 100MB of state to HDFS during the checkpoint?

Following on this example, say the operator is a global window and is storing some state for each unique key observed in the stream of messages (e.g. userId). Assume that over time, the number of observed unique keys grows, so the size of the state also grows (the window state is never purged). Is the entire operator state at the time of each checkpoint stored to the backend? So that over time, the size of the state stored for each checkpoint to the backend grows? Or is the state stored to the backend somehow just the state that changed in some way since the last checkpoint?

Are old checkpoint states in the backend ever deleted / cleaned up? That is, if all of the state for checkpoint n in the backend is all that is needed to restore a failed job, then all state for all checkpoints m < n should not be needed any more, right? Can all of those old checkpoints be deleted from the backend? Does Flink do this?

Thanks,
Zach


Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint state stored in backend, and deleting old checkpoint state

snntr
Hi Zach,

some answers/comments inline.

Cheers

Konstantin

On 05.04.2016 20:39, Zach Cox wrote:
> Hi - I have some questions regarding Flink's checkpointing, specifically
> related to storing state in the backends.
>
> So let's say an operator in a streaming job is building up some state.
> When it receives barriers from all of its input streams, does it store
> *all* of its state to the backend? I think that is what the docs [1] and
> paper [2] imply, but want to make sure. In other words, if the operator
> contains 100MB of state, and the backend is HDFS, does the operator copy
> all 100MB of state to HDFS during the checkpoint?

Yes. With the filesystem backend this happens synchronously, with
RocksDB backend the transfer to HDFS is asynchronous.

> Following on this example, say the operator is a global window and is
> storing some state for each unique key observed in the stream of
> messages (e.g. userId). Assume that over time, the number of observed
> unique keys grows, so the size of the state also grows (the window state
> is never purged). Is the entire operator state at the time of each
> checkpoint stored to the backend? So that over time, the size of the
> state stored for each checkpoint to the backend grows? Or is the state
> stored to the backend somehow just the state that changed in some way
> since the last checkpoint?

The complete state is checkpointed. Incremental backups are currently
not supported, but seem to be on the roadmap.

> Are old checkpoint states in the backend ever deleted / cleaned up? That
> is, if all of the state for checkpoint n in the backend is all that is
> needed to restore a failed job, then all state for all checkpoints m < n
> should not be needed any more, right? Can all of those old checkpoints
> be deleted from the backend? Does Flink do this?

To my knowledge flink takes care of deleting old checkpoints (I think it
says so in the documentation about savepoints.). In my experience
though, if a job is cancelled or crashes, the checkpoint files are
usually not cleaned up. So some housekeeping might be necessary.

> Thanks,
> Zach
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
> [2] http://arxiv.org/abs/1506.08603
>



--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Ufuk Celebi
Hey Zach and Konstantin,

Great questions and answers. We can try to make this more explicit in the docs.

On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
<[hidden email]> wrote:
> To my knowledge flink takes care of deleting old checkpoints (I think it
> says so in the documentation about savepoints.). In my experience
> though, if a job is cancelled or crashes, the checkpoint files are
> usually not cleaned up. So some housekeeping might be necessary.

Regarding cleanup: currently only the latest successful checkpoint is retained.

On graceful shutdown, all checkpoints should be cleaned up as far as I
know. Savepoints always have to be cleaned up manually.

On crashes, the checkpoint state has to be cleaned up manually (if the
JVM shut down hooks did not run).

@Konstantin: did you have lingering state without crashes?

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint state stored in backend, and deleting old checkpoint state

snntr
Hi Ufuk,

I thought so, but I am not sure when and where ;) I will let you know,
if I come across it again.

Cheers,

Konstantin

On 05.04.2016 21:10, Ufuk Celebi wrote:

> Hey Zach and Konstantin,
>
> Great questions and answers. We can try to make this more explicit in the docs.
>
> On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
> <[hidden email]> wrote:
>> To my knowledge flink takes care of deleting old checkpoints (I think it
>> says so in the documentation about savepoints.). In my experience
>> though, if a job is cancelled or crashes, the checkpoint files are
>> usually not cleaned up. So some housekeeping might be necessary.
>
> Regarding cleanup: currently only the latest successful checkpoint is retained.
>
> On graceful shutdown, all checkpoints should be cleaned up as far as I
> know. Savepoints always have to be cleaned up manually.
>
> On crashes, the checkpoint state has to be cleaned up manually (if the
> JVM shut down hooks did not run).
>
> @Konstantin: did you have lingering state without crashes?
>
> – Ufuk
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Zach Cox
Thanks for the details Konstantin and Ufuk!


On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <[hidden email]> wrote:
Hi Ufuk,

I thought so, but I am not sure when and where ;) I will let you know,
if I come across it again.

Cheers,

Konstantin

On 05.04.2016 21:10, Ufuk Celebi wrote:
> Hey Zach and Konstantin,
>
> Great questions and answers. We can try to make this more explicit in the docs.
>
> On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
> <[hidden email]> wrote:
>> To my knowledge flink takes care of deleting old checkpoints (I think it
>> says so in the documentation about savepoints.). In my experience
>> though, if a job is cancelled or crashes, the checkpoint files are
>> usually not cleaned up. So some housekeeping might be necessary.
>
> Regarding cleanup: currently only the latest successful checkpoint is retained.
>
> On graceful shutdown, all checkpoints should be cleaned up as far as I
> know. Savepoints always have to be cleaned up manually.
>
> On crashes, the checkpoint state has to be cleaned up manually (if the
> JVM shut down hooks did not run).
>
> @Konstantin: did you have lingering state without crashes?
>
> – Ufuk
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Stephan Ewen
Hi Zach!

I am working on incremental checkpointing, hope to have it in the master in the next weeks.

The current approach is a to have a full self-contained checkpoint every once in a while, and have incremental checkpoints most of the time. Having a full checkpoint every now and then spares you from re-applying an endless set of deltas on recovery.

Related to that is also making the checkpointing asynchronous, so that normal operations do not see any disruption any more.

Greetings,
Stephan

On Tue, Apr 5, 2016 at 10:25 PM, Zach Cox <[hidden email]> wrote:
Thanks for the details Konstantin and Ufuk!


On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <[hidden email]> wrote:
Hi Ufuk,

I thought so, but I am not sure when and where ;) I will let you know,
if I come across it again.

Cheers,

Konstantin

On 05.04.2016 21:10, Ufuk Celebi wrote:
> Hey Zach and Konstantin,
>
> Great questions and answers. We can try to make this more explicit in the docs.
>
> On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
> <[hidden email]> wrote:
>> To my knowledge flink takes care of deleting old checkpoints (I think it
>> says so in the documentation about savepoints.). In my experience
>> though, if a job is cancelled or crashes, the checkpoint files are
>> usually not cleaned up. So some housekeeping might be necessary.
>
> Regarding cleanup: currently only the latest successful checkpoint is retained.
>
> On graceful shutdown, all checkpoints should be cleaned up as far as I
> know. Savepoints always have to be cleaned up manually.
>
> On crashes, the checkpoint state has to be cleaned up manually (if the
> JVM shut down hooks did not run).
>
> @Konstantin: did you have lingering state without crashes?
>
> – Ufuk
>

--
Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Zach Cox
Hi Stephan - incremental checkpointing sounds really interesting and useful, I look forward to trying it out.

Thanks,
Zach


On Wed, Apr 6, 2016 at 4:39 AM Stephan Ewen <[hidden email]> wrote:
Hi Zach!

I am working on incremental checkpointing, hope to have it in the master in the next weeks.

The current approach is a to have a full self-contained checkpoint every once in a while, and have incremental checkpoints most of the time. Having a full checkpoint every now and then spares you from re-applying an endless set of deltas on recovery.

Related to that is also making the checkpointing asynchronous, so that normal operations do not see any disruption any more.

Greetings,
Stephan

On Tue, Apr 5, 2016 at 10:25 PM, Zach Cox <[hidden email]> wrote:
Thanks for the details Konstantin and Ufuk!


On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <[hidden email]> wrote:
Hi Ufuk,

I thought so, but I am not sure when and where ;) I will let you know,
if I come across it again.

Cheers,

Konstantin

On 05.04.2016 21:10, Ufuk Celebi wrote:
> Hey Zach and Konstantin,
>
> Great questions and answers. We can try to make this more explicit in the docs.
>
> On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
> <[hidden email]> wrote:
>> To my knowledge flink takes care of deleting old checkpoints (I think it
>> says so in the documentation about savepoints.). In my experience
>> though, if a job is cancelled or crashes, the checkpoint files are
>> usually not cleaned up. So some housekeeping might be necessary.
>
> Regarding cleanup: currently only the latest successful checkpoint is retained.
>
> On graceful shutdown, all checkpoints should be cleaned up as far as I
> know. Savepoints always have to be cleaned up manually.
>
> On crashes, the checkpoint state has to be cleaned up manually (if the
> JVM shut down hooks did not run).
>
> @Konstantin: did you have lingering state without crashes?
>
> – Ufuk
>

--
Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082