 
	
					
		
	
					| We have enabled external checkpoints (every 30s), retaining the two latest external checkpoints. We are trying to track down something we see happening where the recovery, checkpoint and external checkpoints directories literally explode in size.   When we have about 20-30 jobs running, we see these directories grow from 60-70 files to hundreds of thousands of files. For e.g.  here are stats on a recent state we got to flink/checkpoints    774824 Objects 5.4 GB flink/ext-checkpoints 229300 Objects 171.8 MB flink/recovery 229531 Objects - 909.7MB Under these circumstances, the JobManager/AppMaster becomes completely unresponsive.  The only option seems to be to stop it, cull those directories and restart, which is not ideal. Appreciate any insight on this… Also, is there any documentation on best/ops practices on maintaining these directories? For e.g. is it OK  to have a reaper script that cleans out these directories for everything that is say 3 days old? Thanks Prashant | 
 
	
					
		
	
					| To add one more data point... it seems like the recovery directory is the bottleneck somehow.. so if we delete the recovery directory and restart the job manager - it comes back and is responsive. Of course, we lose all jobs, since none can be recovered... and that is of course not ideal. So the question seems to be why the recovery directory grows exponentially in the first place. I can't imagine we're the only ones to see this... or we must be configuring something wrong while testing Flink 1.3.1 Thanks for your help in advance Prashant | 
 
	
					
		
	
					| Hi! I am looping in Stefan and Xiaogang who worked a lot in incremental checkpointing. Some background on incremental checkpoints: Incremental checkpoints store "pieces" of the state (RocksDB ssTables) that are shared between checkpoints. Hence it naturally uses more files than no-incremental checkpoints.  You could help us understand this with a few more details:   - Does it only occur with incremental checkpoints, or also with regular checkpoints?   - How many checkpoints to you retain?   - Do you use externalized checkpoints?   - Do you use a highly-available setup with ZooKeeper? Thanks, Stephan On Thu, Jul 13, 2017 at 10:43 PM, prashantnayak <[hidden email]> wrote: 
 | 
 
	
					
		
	
					| Hi Prashantnayak Thanks a lot for reporting this problem. Can you provide more details to address it? I am guessing master has to delete too many files when a checkpoint is subsumed, which is very common in our cases. The number of files in the recovery directory will increase if the master cannot delete these files in time. It usually happens when the checkpoint interval is very small and the degree of parallelism is very large. Regards, Xiaogang 2017-07-15 0:31 GMT+08:00 Stephan Ewen <[hidden email]>: 
 | 
 
	
					
		
	
					| 
		Hi Xiaogang and Stephan
 Thank you for your response. Sorry about the delay in responding (was traveling): We've been trying to figure out what triggers this - but your points about master not being able to delete files "in time" seems to be correct.... We've been test out two different environments 1. where we have a few jobs (< 10) - but these jobs have processed large number of records (e.g. > 200-300 million) 2. where we have many jobs (> 40) - but these jobs are processing very low number of records. We observe that in (1) - recovery, checkpoint directory growth is very proportional to number of jobs and number of retained checkpoints configured (we set it to 2) We observe that in (2) - recovery, checkpoint, ext-checkpoint directory growth is very fast. This environment will eventually get bogged down, get unresponsive and then die. To answer some of your other questions - Does it only occur with incremental checkpoints, or also with regular checkpoints? we believe this occurs in both cases - How many checkpoints to you retain? we retain 2 - Do you use externalized checkpoints? yes, and we set retention = 2 and retain_on_cancellation - Do you use a highly-available setup with ZooKeeper? yes, we do We recently bumped up JobManager (appMaster) CPU and Heap in environment #2 (increased to 4 CPU, 2GB heap, 2.5GB memory allocated to Mesos container), but that has had no effect. Definitely appreciate any additional insight you might be able to provide. This is impeding us in production deployments. Is there any way we can at least mitigate this growth? For e.g, we have a script that can be cron'd and can delete files in the S3 recovery directory that are older than X number of hours. Is it OK to run this script and keep only the last hour worth of recovery files? We notice that there are about 3 types of files in recovery - completedCheckpointXXXYYY - mesosWorkerStoreXXXYYY - submittedJobGraphXXXXYYY is it ok to have the cron job prune all of these so we only have last hour worth, or just perhaps the completedCheckpoint files? Happy to provide any additional detail you need. Just let me know... Thanks Prashant | 
 
	
					
		
	
					| 
		Wanted to add - we took some stack traces and memory dumps... will post them or send them to you, but the stack trace indicates that the appmaster is spending a lot of time in the AWS s3 library trying to list a S3 directory (recovery?)
 Thanks Prashant | 
 
	
					
		
	
					| 
		Hi Xiaogang and Stephan
 We're continuing to test and have now set up the cluster to disable incremental RocksDB checkpointing as well as increasing the checkpoint interval from 30s to 120s (not ideal really :-( ) We'll run it with a large number of jobs and report back if this setup shows improvement. Appreciate any another insights you might have around this problem. Thanks Prashant | 
 
	
					
		
	
					| Hi Prashant! I assume you are using Flink 1.3.0 or 1.3.1? Here are some things you can do:   - I would try and disable the incremental checkpointing for a start and see what happens then. That should reduce the number of files already.   - Is it possible for you to run a patched version of Flink? If yes, can you try to do the following: In the class "FileStateHandle", in the method "discardState()", remove the code around "FileUtils.deletePathIfEmpty(...)" - this is probably not working well when hitting too many S3 files.   -  You can delete old "completedCheckpointXXXYYY" files, but please do not delete the other two types, they are needed for HA recovery. Greetings, Stephan On Mon, Jul 24, 2017 at 3:46 AM, prashantnayak <[hidden email]> wrote: Hi Xiaogang and Stephan | 
 
	
					
		
	
					| Hi Prashant! Flink's S3 integration currently goes through Hadoop's S3 file system (as you probably noticed). It seems that the Hadoop's S3 file system is not really well suited for what we want to do, and we are looking to drop it and replace it by something direct (independent of Hadoop) in the coming release... One essential thing to make sure is to not have the "trash" activated in the configuration, as it adds very high overhead to the delete operations. Best, Stephan On Mon, Jul 24, 2017 at 7:56 PM, Stephan Ewen <[hidden email]> wrote: 
 | 
| Hi Stephan,     Making Flink's S3 integration independent of Hadoop is great. We've been running into a lot of Hadoop configuration trouble when trying to enabling Flink checkpointing with S3 on AWS EMR.     Is there any concrete plan or tickets created yet for tracking? Thanks, Bowen On Mon, Jul 24, 2017 at 11:12 AM, Stephan Ewen <[hidden email]> wrote: 
 | 
 
	
					
		
	
					| 
				This post was updated on .
			 
				In reply to this post by Stephan Ewen
			 
		Thanks Stephan
 We can confirm that turning off RocksDB incremental checkpointing seems to help and greatly reduces the number of files (from tens of thousands to low thousands). We still see that there is a inflection point when running > 50 jobs causes the appmaster to stop deleting files from S3 and leads to a unbounded growth (slower without incremental checkpointing) of the S3 recovery directory. We would be willing to try a patched version (already have a fork)... just to confirm you are suggesting to delete the line "fs.delete(filePath, false);" from discardState()? ``` @Override public void discardState() throws Exception { FileSystem fs = getFileSystem(); fs.delete(filePath, false); try { FileUtils.deletePathIfEmpty(fs, filePath.getParent()); } catch (Exception ignored) {} } ``` | 
 
	
					
		
	
					| 
				This post was updated on .
			 
				In reply to this post by Stephan Ewen
			 
		Hi Stephan
 Unclear on what you mean by the "trash" option... thought that was only available for command line hadoop and not applicable for Java API, which is what Flink uses? If there is a configuration for the Flink/Hadoop connector that we can use, please let me know. Also, one additional thing about S3.... S3 supports this option of "versioned" buckets... Since versioning basically results in a delete on a object not actually deleting it (unless there is a bucket lifecycle policy)... I think you should recommend that Flink users that rely on S3 turn off bucket versioning since it seems to not really be a factor for Flink... Thanks Prashant | 
 
	
					
		
	
					| 
		Hi,
 I think Stephan was talking about removing this part: try { FileUtils.deletePathIfEmpty(fs, filePath.getParent()); } catch (Exception ignored) {} This part should *NOT* be removed: fs.delete(filePath, false); The reason is that the first is only an additional cleanup that, for each deleted file, checks if the containing directory is now empty and then removes the directory. Checking for empty directories includes listing all files in the directory, which is expensive in S3 and the only downside of removing the line are orphaned empty checkpoint directories, which could be cleaned by a script. Delete of the file itself must remain because that is how we release files from old checkpoints. Best, Stefan > Am 26.07.2017 um 04:31 schrieb prashantnayak <[hidden email]>: > > Hi Stephan > > Unclear on what you mean by the "trash" option... thought that was only > available for command line hadoop and not applicable for API, which is what > Flink uses? If there is a configuration for the Flink/Hadoop connector, > please let me know. > > Also, one additional thing about S3.... S3 supports this option of > "versioned" buckets... Since versioning basically results in a delete on a > object not actually deleting it (unless there is a bucket lifecycle > policy)... I think you should recommend that Flink users that rely on S3 > turn off bucket versioning since it seems to not really be a factor for > Flink... > > Thanks > Prashant > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14453.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| Stefan is correct in pointing out the lines to remove. FYI: We are trying to add this patch to the upcoming 1.3.2 release which should also help: https://github.com/apache/flink/pull/4397 On Wed, Jul 26, 2017 at 9:48 AM, Stefan Richter <[hidden email]> wrote: Hi, | 
 
	
					
		
	
					| 
		Thanks Stephan and Stefan
 We're looking forward to this patch in 1.3.2 We will use a patched version depending upon when 1.3.2 is going to be available. We're also implementing a cron job to remove orphaned/older completedCheckpoint files per your recommendations.. one caveat with a job like that is that we have to check if a particular job is stopped/paused/down and also if the Job Manager is down so we don't accidentally remove valid checkpoint files.. this makes it a bit dicey.... ideal of course is not to have to do this. The move away from hadoop/s3 would be welcome as well. Flink job state is critical to us since we have very long running jobs (months) processing hundreds of millions of records. Thanks Prashant | 
 
	
					
		
	
					| 
		Hi,
 your concerns about deleting files when using incremental checkpoints is very valid. Deleting empty checkpoint folders is obviously ok. As for files, I have recently added some additional logging to the checkpointing mechanism to report the files referenced in the last checkpoint. I will try to also include the logging in 1.3.2. Based on this, you could make safe assumptions about which files are actually orphaned. I am even considering packing this list as a plain text file with the checkpoint, to make this more transparent for users. Best, Stefan > Am 26.07.2017 um 16:57 schrieb prashantnayak <[hidden email]>: > > Thanks Stephan and Stefan > > We're looking forward to this patch in 1.3.2 > > We will use a patched version depending upon when 1.3.2 is going to be > available. > > We're also implementing a cron job to remove orphaned/older > completedCheckpoint files per your recommendations.. one caveat with a job > like that is that we have to check if a particular job is > stopped/paused/down and also if the Job Manager is down so we don't > accidentally remove valid checkpoint files.. this makes it a bit dicey.... > ideal of course is not to have to do this. > > The move away from hadoop/s3 would be welcome as well. > > Flink job state is critical to us since we have very long running jobs > (months) processing hundreds of millions of records. > > Thanks > Prashant > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14477.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| 
		Thanks Stefan.
 +1 on "I am even considering packing this list as a plain text file with the checkpoint, to make this more transparent for users" that is def. more Ops friendly... Thanks Prashant | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
