Flink incremental checkpointing - how long does data is kept in the share folder

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink incremental checkpointing - how long does data is kept in the share folder

scarmeli
We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.

We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
e.g.
If today is 7/4 there are some files from the 2/4

Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job

My questions are

Why do we see data that is older from lateness configuration
How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
Reply | Threaded
Open this post in threaded view
|

Re: Flink incremental checkpointing - how long does data is kept in the share folder

Yun Tang
Hi Shachar

Why do we see data that is older from lateness configuration
There might existed three reasons:
  1. RocksDB really still need that file in current checkpoint. If we upload one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint could still include that 42.sst file again if that file is never be compacted since then. This is possible in theory.
  2. Your checkpoint size is large and checkpoint coordinator could not remove as fast as possible before exit.
  3. That file is created by a crash task manager and not known to checkpoint coordinator.
How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata in checkpoint directory and compare the file paths with current files in checkpoint directory. The ones are not in the checkpoint meta and older than latest checkpoint could be removed. You could follow this to debug or maybe I could write a tool to help know what files could be deleted later.


Best
Yun Tang


From: Shachar Carmeli <[hidden email]>
Sent: Tuesday, April 7, 2020 16:19
To: [hidden email] <[hidden email]>
Subject: Flink incremental checkpointing - how long does data is kept in the share folder
 
We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.

We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
e.g.
If today is 7/4 there are some files from the 2/4

Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job

My questions are

Why do we see data that is older from lateness configuration
How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
Reply | Threaded
Open this post in threaded view
|

Re: Flink incremental checkpointing - how long does data is kept in the share folder

scarmeli
Thank you for the quick response
Your answer related to the checkpoint folder that contains the _metadata file e.g. chk-1829
What about the "shared" folder , how do I know which  files in that folder are still relevant and which are left over from a failed checkpoint , they are not directly related to the _metadata checkpoint or am I missing something?


On 2020/04/07 18:37:57, Yun Tang <[hidden email]> wrote:

> Hi Shachar
>
> Why do we see data that is older from lateness configuration
> There might existed three reasons:
>
>   1.  RocksDB really still need that file in current checkpoint. If we upload one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint could still include that 42.sst file again if that file is never be compacted since then. This is possible in theory.
>   2.  Your checkpoint size is large and checkpoint coordinator could not remove as fast as possible before exit.
>   3.  That file is created by a crash task manager and not known to checkpoint coordinator.
>
> How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata in checkpoint directory and compare the file paths with current files in checkpoint directory. The ones are not in the checkpoint meta and older than latest checkpoint could be removed. You could follow this to debug or maybe I could write a tool to help know what files could be deleted later.
>
> [1] https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
>
> Best
> Yun Tang
>
> ________________________________
> From: Shachar Carmeli <[hidden email]>
> Sent: Tuesday, April 7, 2020 16:19
> To: [hidden email] <[hidden email]>
> Subject: Flink incremental checkpointing - how long does data is kept in the share folder
>
> We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.
>
> We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
> e.g.
> If today is 7/4 there are some files from the 2/4
>
> Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job
>
> My questions are
>
> Why do we see data that is older from lateness configuration
> How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink incremental checkpointing - how long does data is kept in the share folder

Yun Tang
Hi Shachar

I think you could refer to [1] to know the directory structure of checkpoints. The '_metadata' file contains all information of which  checkpointed data file belongs, e.g. file paths under 'shared' folder. As I said before, you need to call Checkpoints#loadCheckpointMetadata to load '_metadata' to know which files belonging to that checkpoint.



Best
Yun Tang


From: Shachar Carmeli <[hidden email]>
Sent: Sunday, April 12, 2020 15:32
To: [hidden email] <[hidden email]>
Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
 
Thank you for the quick response
Your answer related to the checkpoint folder that contains the _metadata file e.g. chk-1829
What about the "shared" folder , how do I know which  files in that folder are still relevant and which are left over from a failed checkpoint , they are not directly related to the _metadata checkpoint or am I missing something?


On 2020/04/07 18:37:57, Yun Tang <[hidden email]> wrote:
> Hi Shachar
>
> Why do we see data that is older from lateness configuration
> There might existed three reasons:
>
>   1.  RocksDB really still need that file in current checkpoint. If we upload one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint could still include that 42.sst file again if that file is never be compacted since then. This is possible in theory.
>   2.  Your checkpoint size is large and checkpoint coordinator could not remove as fast as possible before exit.
>   3.  That file is created by a crash task manager and not known to checkpoint coordinator.
>
> How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata in checkpoint directory and compare the file paths with current files in checkpoint directory. The ones are not in the checkpoint meta and older than latest checkpoint could be removed. You could follow this to debug or maybe I could write a tool to help know what files could be deleted later.
>
> [1] https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
>
> Best
> Yun Tang
>
> ________________________________
> From: Shachar Carmeli <[hidden email]>
> Sent: Tuesday, April 7, 2020 16:19
> To: [hidden email] <[hidden email]>
> Subject: Flink incremental checkpointing - how long does data is kept in the share folder
>
> We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.
>
> We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
> e.g.
> If today is 7/4 there are some files from the 2/4
>
> Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job
>
> My questions are
>
> Why do we see data that is older from lateness configuration
> How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink incremental checkpointing - how long does data is kept in the share folder

scarmeli
Hi Yum
I noticed that the some files are related to the checkpoint but are not mentioned in the metadata file
and some of the files that are related in the metadata file (usually ByteStreamStateHandle ) are not in the share file
can you explain this behaviour ?

see code I was using
final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());

                        final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
                                        .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
                                                        .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
                                                                        .flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream()
                                                                                        .map(streamStateHandle -> {
                                                                                                totalSize[0] += streamStateHandle.getStateSize();
                                                                                                String name = null;
                                                                                                if (streamStateHandle instanceof FileStateHandle) {
                                                                                                        name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
                                                                                                } else {
                                                                                                        final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
                                                                                                        name = new File(handleName).getName();
                                                                                                }
                                                                                                return name;

                                                                                        }))))
                                        .collect(Collectors.toSet());

Thanks in advance
Shachar

On 2020/04/13 14:30:40, Yun Tang <[hidden email]> wrote:

> Hi Shachar
>
> I think you could refer to [1] to know the directory structure of checkpoints. The '_metadata' file contains all information of which  checkpointed data file belongs, e.g. file paths under 'shared' folder. As I said before, you need to call Checkpoints#loadCheckpointMetadata to load '_metadata' to know which files belonging to that checkpoint.
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
>
> Best
> Yun Tang
>
> ________________________________
> From: Shachar Carmeli <[hidden email]>
> Sent: Sunday, April 12, 2020 15:32
> To: [hidden email] <[hidden email]>
> Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
>
> Thank you for the quick response
> Your answer related to the checkpoint folder that contains the _metadata file e.g. chk-1829
> What about the "shared" folder , how do I know which  files in that folder are still relevant and which are left over from a failed checkpoint , they are not directly related to the _metadata checkpoint or am I missing something?
>
>
> On 2020/04/07 18:37:57, Yun Tang <[hidden email]> wrote:
> > Hi Shachar
> >
> > Why do we see data that is older from lateness configuration
> > There might existed three reasons:
> >
> >   1.  RocksDB really still need that file in current checkpoint. If we upload one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint could still include that 42.sst file again if that file is never be compacted since then. This is possible in theory.
> >   2.  Your checkpoint size is large and checkpoint coordinator could not remove as fast as possible before exit.
> >   3.  That file is created by a crash task manager and not known to checkpoint coordinator.
> >
> > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> > You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata in checkpoint directory and compare the file paths with current files in checkpoint directory. The ones are not in the checkpoint meta and older than latest checkpoint could be removed. You could follow this to debug or maybe I could write a tool to help know what files could be deleted later.
> >
> > [1] https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Shachar Carmeli <[hidden email]>
> > Sent: Tuesday, April 7, 2020 16:19
> > To: [hidden email] <[hidden email]>
> > Subject: Flink incremental checkpointing - how long does data is kept in the share folder
> >
> > We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.
> >
> > We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
> > e.g.
> > If today is 7/4 there are some files from the 2/4
> >
> > Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job
> >
> > My questions are
> >
> > Why do we see data that is older from lateness configuration
> > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink incremental checkpointing - how long does data is kept in the share folder

Yun Tang
Hi Shachar

You can refer to [1] to know the directory structure. The files (usually ByteStreamStateHandle) which are not in the shared folder are exclusive state like operator state or exclusive files uploaded during each incremental checkpoint. And actually I don't understand why you would say some files are not mentioned in the metadata file but are related to the checkpoint? How to judge that they are related to specific checkpoint?

BTW, my name is "Yun" which means cloud in Chinese, not the delicious "Yum" 🙂

Best
Yun Tang

From: Shachar Carmeli <[hidden email]>
Sent: Monday, April 20, 2020 15:36
To: [hidden email] <[hidden email]>
Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
 
Hi Yum
I noticed that the some files are related to the checkpoint but are not mentioned in the metadata file
and some of the files that are related in the metadata file (usually ByteStreamStateHandle ) are not in the share file
can you explain this behaviour ?

see code I was using
final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());

                        final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
                                        .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
                                                        .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
                                                                        .flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream()
                                                                                        .map(streamStateHandle -> {
                                                                                                totalSize[0] += streamStateHandle.getStateSize();
                                                                                                String name = null;
                                                                                                if (streamStateHandle instanceof FileStateHandle) {
                                                                                                        name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
                                                                                                } else {
                                                                                                        final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
                                                                                                        name = new File(handleName).getName();
                                                                                                }
                                                                                                return name;

                                                                                        }))))
                                        .collect(Collectors.toSet());

Thanks in advance
Shachar

On 2020/04/13 14:30:40, Yun Tang <[hidden email]> wrote:
> Hi Shachar
>
> I think you could refer to [1] to know the directory structure of checkpoints. The '_metadata' file contains all information of which  checkpointed data file belongs, e.g. file paths under 'shared' folder. As I said before, you need to call Checkpoints#loadCheckpointMetadata to load '_metadata' to know which files belonging to that checkpoint.
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
>
> Best
> Yun Tang
>
> ________________________________
> From: Shachar Carmeli <[hidden email]>
> Sent: Sunday, April 12, 2020 15:32
> To: [hidden email] <[hidden email]>
> Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
>
> Thank you for the quick response
> Your answer related to the checkpoint folder that contains the _metadata file e.g. chk-1829
> What about the "shared" folder , how do I know which  files in that folder are still relevant and which are left over from a failed checkpoint , they are not directly related to the _metadata checkpoint or am I missing something?
>
>
> On 2020/04/07 18:37:57, Yun Tang <[hidden email]> wrote:
> > Hi Shachar
> >
> > Why do we see data that is older from lateness configuration
> > There might existed three reasons:
> >
> >   1.  RocksDB really still need that file in current checkpoint. If we upload one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint could still include that 42.sst file again if that file is never be compacted since then. This is possible in theory.
> >   2.  Your checkpoint size is large and checkpoint coordinator could not remove as fast as possible before exit.
> >   3.  That file is created by a crash task manager and not known to checkpoint coordinator.
> >
> > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> > You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata in checkpoint directory and compare the file paths with current files in checkpoint directory. The ones are not in the checkpoint meta and older than latest checkpoint could be removed. You could follow this to debug or maybe I could write a tool to help know what files could be deleted later.
> >
> > [1] https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Shachar Carmeli <[hidden email]>
> > Sent: Tuesday, April 7, 2020 16:19
> > To: [hidden email] <[hidden email]>
> > Subject: Flink incremental checkpointing - how long does data is kept in the share folder
> >
> > We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.
> >
> > We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
> > e.g.
> > If today is 7/4 there are some files from the 2/4
> >
> > Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job
> >
> > My questions are
> >
> > Why do we see data that is older from lateness configuration
> > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink incremental checkpointing - how long does data is kept in the share folder

scarmeli
Hi Yun,

First of all sorry for the naming mistake , it was a typo
 How to judge that they are related to specific checkpoint?
  I judged by removing the files and restarting the job - seeing if it fails

in the code below I missed the privateState that is why I was missing files

What about recovery folders how can I know when and what files can I removed?
I see the folder contains recovery -<job-name>  and list of completedCheckpoint3bc1020bb0f9

Best,
Shachar

On 2020/04/20 12:14:35, Yun Tang <[hidden email]> wrote:

> Hi Shachar
>
> You can refer to [1] to know the directory structure. The files (usually ByteStreamStateHandle) which are not in the shared folder are exclusive state like operator state or exclusive files uploaded during each incremental checkpoint. And actually I don't understand why you would say some files are not mentioned in the metadata file but are related to the checkpoint? How to judge that they are related to specific checkpoint?
>
> BTW, my name is "Yun" which means cloud in Chinese, not the delicious "Yum" 🙂
>
> Best
> Yun Tang
> ________________________________
> From: Shachar Carmeli <[hidden email]>
> Sent: Monday, April 20, 2020 15:36
> To: [hidden email] <[hidden email]>
> Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
>
> Hi Yum
> I noticed that the some files are related to the checkpoint but are not mentioned in the metadata file
> and some of the files that are related in the metadata file (usually ByteStreamStateHandle ) are not in the share file
> can you explain this behaviour ?
>
> see code I was using
> final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());
>
>                         final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
>                                         .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
>                                                         .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
>                                                                         .flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream()
>                                                                                         .map(streamStateHandle -> {
>                                                                                                 totalSize[0] += streamStateHandle.getStateSize();
>                                                                                                 String name = null;
>                                                                                                 if (streamStateHandle instanceof FileStateHandle) {
>                                                                                                         name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
>                                                                                                 } else {
>                                                                                                         final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
>                                                                                                         name = new File(handleName).getName();
>                                                                                                 }
>                                                                                                 return name;
>
>                                                                                         }))))
>                                         .collect(Collectors.toSet());
>
> Thanks in advance
> Shachar
>
> On 2020/04/13 14:30:40, Yun Tang <[hidden email]> wrote:
> > Hi Shachar
> >
> > I think you could refer to [1] to know the directory structure of checkpoints. The '_metadata' file contains all information of which  checkpointed data file belongs, e.g. file paths under 'shared' folder. As I said before, you need to call Checkpoints#loadCheckpointMetadata to load '_metadata' to know which files belonging to that checkpoint.
> >
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Shachar Carmeli <[hidden email]>
> > Sent: Sunday, April 12, 2020 15:32
> > To: [hidden email] <[hidden email]>
> > Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
> >
> > Thank you for the quick response
> > Your answer related to the checkpoint folder that contains the _metadata file e.g. chk-1829
> > What about the "shared" folder , how do I know which  files in that folder are still relevant and which are left over from a failed checkpoint , they are not directly related to the _metadata checkpoint or am I missing something?
> >
> >
> > On 2020/04/07 18:37:57, Yun Tang <[hidden email]> wrote:
> > > Hi Shachar
> > >
> > > Why do we see data that is older from lateness configuration
> > > There might existed three reasons:
> > >
> > >   1.  RocksDB really still need that file in current checkpoint. If we upload one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint could still include that 42.sst file again if that file is never be compacted since then. This is possible in theory.
> > >   2.  Your checkpoint size is large and checkpoint coordinator could not remove as fast as possible before exit.
> > >   3.  That file is created by a crash task manager and not known to checkpoint coordinator.
> > >
> > > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> > > You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata in checkpoint directory and compare the file paths with current files in checkpoint directory. The ones are not in the checkpoint meta and older than latest checkpoint could be removed. You could follow this to debug or maybe I could write a tool to help know what files could be deleted later.
> > >
> > > [1] https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
> > >
> > > Best
> > > Yun Tang
> > >
> > > ________________________________
> > > From: Shachar Carmeli <[hidden email]>
> > > Sent: Tuesday, April 7, 2020 16:19
> > > To: [hidden email] <[hidden email]>
> > > Subject: Flink incremental checkpointing - how long does data is kept in the share folder
> > >
> > > We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.
> > >
> > > We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
> > > e.g.
> > > If today is 7/4 there are some files from the 2/4
> > >
> > > Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job
> > >
> > > My questions are
> > >
> > > Why do we see data that is older from lateness configuration
> > > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink incremental checkpointing - how long does data is kept in the share folder

Yun Tang
Hi Shachar

The basic rule to remove old data:
  1. No two running Flink jobs use the same checkpoint directory (at the job-id level).
  2. Files are not recorded in the latest checkpoint metadata are the candidate to remove.
  3. If the checkpoint directory is still been written by Flink job, the files which are newly created than last checkpoint completed time should also been filter out (if you are not retain multi checkpoints). The rest files are safe to remove.
A simple way is stopping the job, and remove all files not recorded in the checkpoint metadata.

Best
Yun Tang

From: Shachar Carmeli <[hidden email]>
Sent: Tuesday, April 21, 2020 20:46
To: [hidden email] <[hidden email]>
Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
 
Hi Yun,

First of all sorry for the naming mistake , it was a typo
 How to judge that they are related to specific checkpoint?
  I judged by removing the files and restarting the job - seeing if it fails

in the code below I missed the privateState that is why I was missing files

What about recovery folders how can I know when and what files can I removed?
I see the folder contains recovery -<job-name>  and list of completedCheckpoint3bc1020bb0f9

Best,
Shachar

On 2020/04/20 12:14:35, Yun Tang <[hidden email]> wrote:
> Hi Shachar
>
> You can refer to [1] to know the directory structure. The files (usually ByteStreamStateHandle) which are not in the shared folder are exclusive state like operator state or exclusive files uploaded during each incremental checkpoint. And actually I don't understand why you would say some files are not mentioned in the metadata file but are related to the checkpoint? How to judge that they are related to specific checkpoint?
>
> BTW, my name is "Yun" which means cloud in Chinese, not the delicious "Yum" 🙂
>
> Best
> Yun Tang
> ________________________________
> From: Shachar Carmeli <[hidden email]>
> Sent: Monday, April 20, 2020 15:36
> To: [hidden email] <[hidden email]>
> Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
>
> Hi Yum
> I noticed that the some files are related to the checkpoint but are not mentioned in the metadata file
> and some of the files that are related in the metadata file (usually ByteStreamStateHandle ) are not in the share file
> can you explain this behaviour ?
>
> see code I was using
> final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());
>
>                         final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
>                                         .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
>                                                         .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
>                                                                         .flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream()
>                                                                                         .map(streamStateHandle -> {
>                                                                                                 totalSize[0] += streamStateHandle.getStateSize();
>                                                                                                 String name = null;
>                                                                                                 if (streamStateHandle instanceof FileStateHandle) {
>                                                                                                         name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
>                                                                                                 } else {
>                                                                                                         final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
>                                                                                                         name = new File(handleName).getName();
>                                                                                                 }
>                                                                                                 return name;
>
>                                                                                         }))))
>                                         .collect(Collectors.toSet());
>
> Thanks in advance
> Shachar
>
> On 2020/04/13 14:30:40, Yun Tang <[hidden email]> wrote:
> > Hi Shachar
> >
> > I think you could refer to [1] to know the directory structure of checkpoints. The '_metadata' file contains all information of which  checkpointed data file belongs, e.g. file paths under 'shared' folder. As I said before, you need to call Checkpoints#loadCheckpointMetadata to load '_metadata' to know which files belonging to that checkpoint.
> >
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Shachar Carmeli <[hidden email]>
> > Sent: Sunday, April 12, 2020 15:32
> > To: [hidden email] <[hidden email]>
> > Subject: Re: Flink incremental checkpointing - how long does data is kept in the share folder
> >
> > Thank you for the quick response
> > Your answer related to the checkpoint folder that contains the _metadata file e.g. chk-1829
> > What about the "shared" folder , how do I know which  files in that folder are still relevant and which are left over from a failed checkpoint , they are not directly related to the _metadata checkpoint or am I missing something?
> >
> >
> > On 2020/04/07 18:37:57, Yun Tang <[hidden email]> wrote:
> > > Hi Shachar
> > >
> > > Why do we see data that is older from lateness configuration
> > > There might existed three reasons:
> > >
> > >   1.  RocksDB really still need that file in current checkpoint. If we upload one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint could still include that 42.sst file again if that file is never be compacted since then. This is possible in theory.
> > >   2.  Your checkpoint size is large and checkpoint coordinator could not remove as fast as possible before exit.
> > >   3.  That file is created by a crash task manager and not known to checkpoint coordinator.
> > >
> > > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> > > You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata in checkpoint directory and compare the file paths with current files in checkpoint directory. The ones are not in the checkpoint meta and older than latest checkpoint could be removed. You could follow this to debug or maybe I could write a tool to help know what files could be deleted later.
> > >
> > > [1] https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
> > >
> > > Best
> > > Yun Tang
> > >
> > > ________________________________
> > > From: Shachar Carmeli <[hidden email]>
> > > Sent: Tuesday, April 7, 2020 16:19
> > > To: [hidden email] <[hidden email]>
> > > Subject: Flink incremental checkpointing - how long does data is kept in the share folder
> > >
> > > We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.
> > >
> > > We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
> > > e.g.
> > > If today is 7/4 there are some files from the 2/4
> > >
> > > Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job
> > >
> > > My questions are
> > >
> > > Why do we see data that is older from lateness configuration
> > > How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files
> > >
> >
>