Hi,
we have stream job with quite large state (few GB), we're using FSStateBackend and we're storing checkpoints in hdfs. What we observe is that v. often old checkpoints are not discarded properly. In hadoop logs I can see: 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: blk_1084791727_11053122 10.10.113.10:50010 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233 Call#12337 Retry#0 org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non empty': Directory is not empty at org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) While on flink side (jobmanager log) we don't see any problems: 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 62 @ 1462875622636 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 62 (in 9843 ms) 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 63 @ 1462875652637 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 63 (in 13909 ms) 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 64 @ 1462875682636 I see in the code that delete operations in flink are done with recursive flag set to false - but I'm not sure why the contents are not being deleted before? When we were using RocksDB backed we didn't encounter such situation. we're using flink 1.0.1 and hdfs 2.7.2. Do anybody has any idea why this could be happening? thanks, maciek |
Hey Maciek,
thanks for reporting this. Having files linger around looks like a bug to me. The idea behind having the recursive flag set to false in the AbstractFileStateHandle.discardState() call is that the FileStateHandle is actually just a single file and not a directory. The second call trying to delete the parent directory only succeeds when all other files in that directory have been deleted as well. I think this is what sometimes fails with many state handles. For RocksDB there is only a single state handle, which works well. I will open an issue for this and try to reproduce it reliably and then fix it. – Ufuk On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <[hidden email]> wrote: > Hi, > > we have stream job with quite large state (few GB), we're using > FSStateBackend and we're storing checkpoints in hdfs. > What we observe is that v. often old checkpoints are not discarded properly. > In hadoop logs I can see: > > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server > handler 9 on 8020, call > org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233 > Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > > While on flink side (jobmanager log) we don't see any problems: > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > > I see in the code that delete operations in flink are done with recursive > flag set to false - but I'm not sure why the contents are not being deleted > before? > When we were using RocksDB backed we didn't encounter such situation. > we're using flink 1.0.1 and hdfs 2.7.2. > > Do anybody has any idea why this could be happening? > > thanks, > maciek > > > |
The issue is here: https://issues.apache.org/jira/browse/FLINK-3902
(My "explanation" before dosn't make sense actually and I don't see a reason why this should be related to having many state handles.) On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <[hidden email]> wrote: > Hey Maciek, > > thanks for reporting this. Having files linger around looks like a bug to me. > > The idea behind having the recursive flag set to false in the > AbstractFileStateHandle.discardState() call is that the > FileStateHandle is actually just a single file and not a directory. > The second call trying to delete the parent directory only succeeds > when all other files in that directory have been deleted as well. I > think this is what sometimes fails with many state handles. For > RocksDB there is only a single state handle, which works well. > > I will open an issue for this and try to reproduce it reliably and then fix it. > > – Ufuk > > > On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <[hidden email]> wrote: >> Hi, >> >> we have stream job with quite large state (few GB), we're using >> FSStateBackend and we're storing checkpoints in hdfs. >> What we observe is that v. often old checkpoints are not discarded properly. >> In hadoop logs I can see: >> >> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: >> blk_1084791727_11053122 10.10.113.10:50010 >> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server >> handler 9 on 8020, call >> org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233 >> Call#12337 Retry#0 >> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: >> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non >> empty': Directory is not empty >> at >> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) >> >> While on flink side (jobmanager log) we don't see any problems: >> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 62 @ 1462875622636 >> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 62 (in 9843 ms) >> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 63 @ 1462875652637 >> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 63 (in 13909 ms) >> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 64 @ 1462875682636 >> >> I see in the code that delete operations in flink are done with recursive >> flag set to false - but I'm not sure why the contents are not being deleted >> before? >> When we were using RocksDB backed we didn't encounter such situation. >> we're using flink 1.0.1 and hdfs 2.7.2. >> >> Do anybody has any idea why this could be happening? >> >> thanks, >> maciek >> >> >> |
thanks,
I'll try to reproduce it in some test by myself... maciek On 12/05/2016 18:39, Ufuk Celebi wrote: > The issue is here: https://issues.apache.org/jira/browse/FLINK-3902 > > (My "explanation" before dosn't make sense actually and I don't see a > reason why this should be related to having many state handles.) > > On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <[hidden email]> wrote: >> Hey Maciek, >> >> thanks for reporting this. Having files linger around looks like a bug to me. >> >> The idea behind having the recursive flag set to false in the >> AbstractFileStateHandle.discardState() call is that the >> FileStateHandle is actually just a single file and not a directory. >> The second call trying to delete the parent directory only succeeds >> when all other files in that directory have been deleted as well. I >> think this is what sometimes fails with many state handles. For >> RocksDB there is only a single state handle, which works well. >> >> I will open an issue for this and try to reproduce it reliably and then fix it. >> >> – Ufuk >> >> >> On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <[hidden email]> wrote: >>> Hi, >>> >>> we have stream job with quite large state (few GB), we're using >>> FSStateBackend and we're storing checkpoints in hdfs. >>> What we observe is that v. often old checkpoints are not discarded properly. >>> In hadoop logs I can see: >>> >>> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: >>> blk_1084791727_11053122 10.10.113.10:50010 >>> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server >>> handler 9 on 8020, call >>> org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233 >>> Call#12337 Retry#0 >>> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: >>> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non >>> empty': Directory is not empty >>> at >>> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) >>> at >>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) >>> >>> While on flink side (jobmanager log) we don't see any problems: >>> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>> checkpoint 62 @ 1462875622636 >>> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>> checkpoint 62 (in 9843 ms) >>> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>> checkpoint 63 @ 1462875652637 >>> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>> checkpoint 63 (in 13909 ms) >>> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>> checkpoint 64 @ 1462875682636 >>> >>> I see in the code that delete operations in flink are done with recursive >>> flag set to false - but I'm not sure why the contents are not being deleted >>> before? >>> When we were using RocksDB backed we didn't encounter such situation. >>> we're using flink 1.0.1 and hdfs 2.7.2. >>> >>> Do anybody has any idea why this could be happening? >>> >>> thanks, >>> maciek >>> >>> >>> |
Hi Ufuk,
It seems I messed it up a bit :) I cannot comment on jira, since it's temporarily locked... 1. org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non empty': Directory is not empty - this seems to be expected behaviour, as AbstractFileStateHandle.discardState(): // send a call to delete the checkpoint directory containing the file. This will // fail (and be ignored) when some files still exist try { getFileSystem().delete(filePath.getParent(), false); } catch (IOException ignored) {} - so this is working as expected, although it causes a lot of garbage in hdfs logs... 2. The problem with not discarded checkpoints seems to be related to periods when we don't have any traffic (during night). At that point many checkpoints "expire before completing": 2016-05-13 00:00:10,585 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 199 @ 1463090410585 2016-05-13 00:10:10,585 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 199 expired before completing. 2016-05-13 00:25:14,650 [flink-akka.actor.default-dispatcher-280300] WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 199 When checkpoint manage to complete they take v. long to do so: 2016-05-13 00:25:19,071 [flink-akka.actor.default-dispatcher-280176] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 201 (in 308472 ms) - this is happening when no new messages arrive (we have simple process like kafka->keyBy->custom state aggregation->kafka, with EventTime time characteristic) I think I messed sth up with eventTime & generating watermarks - I'll have to check it. With RocksDB I made checkpoints at much larger intervals, so probably that's why I haven't noticed the disk is getting full. OTOH - shouldn't expired checkpoints be cleaned up automatically? Sorry for confustion and thanks for help thanks, maciek On 12/05/2016 21:28, Maciek Próchniak
wrote:
thanks, |
Free forum by Nabble | Edit this page |