checkpoints not being removed from HDFS

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

checkpoints not being removed from HDFS

Maciek Próchniak
Hi,

we have stream job with quite large state (few GB), we're using
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded
properly. In hadoop logs I can see:

2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates:
blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 9 on 8020, call
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from
10.10.113.9:49233 Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty
         at
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
         at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)

While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088]
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Completed checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028]
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Completed checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 64 @ 1462875682636

I see in the code that delete operations in flink are done with
recursive flag set to false - but I'm not sure why the contents are not
being deleted before?
When we were using RocksDB backed we didn't encounter such situation.
we're using flink 1.0.1 and hdfs 2.7.2.

Do anybody has any idea why this could be happening?

thanks,
maciek



Reply | Threaded
Open this post in threaded view
|

Re: checkpoints not being removed from HDFS

Ufuk Celebi
Hey Maciek,

thanks for reporting this. Having files linger around looks like a bug to me.

The idea behind having the recursive flag set to false in the
AbstractFileStateHandle.discardState() call is that the
FileStateHandle is actually just a single file and not a directory.
The second call trying to delete the parent directory only succeeds
when all other files in that directory have been deleted as well. I
think this is what sometimes fails with many state handles. For
RocksDB there is only a single state handle, which works well.

I will open an issue for this and try to reproduce it reliably and then fix it.

– Ufuk


On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <[hidden email]> wrote:

> Hi,
>
> we have stream job with quite large state (few GB), we're using
> FSStateBackend and we're storing checkpoints in hdfs.
> What we observe is that v. often old checkpoints are not discarded properly.
> In hadoop logs I can see:
>
> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates:
> blk_1084791727_11053122 10.10.113.10:50010
> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
> handler 9 on 8020, call
> org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233
> Call#12337 Retry#0
> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
> empty': Directory is not empty
>         at
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
>
> While on flink side (jobmanager log) we don't see any problems:
> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 62 @ 1462875622636
> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 62 (in 9843 ms)
> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 63 @ 1462875652637
> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 63 (in 13909 ms)
> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 64 @ 1462875682636
>
> I see in the code that delete operations in flink are done with recursive
> flag set to false - but I'm not sure why the contents are not being deleted
> before?
> When we were using RocksDB backed we didn't encounter such situation.
> we're using flink 1.0.1 and hdfs 2.7.2.
>
> Do anybody has any idea why this could be happening?
>
> thanks,
> maciek
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: checkpoints not being removed from HDFS

Ufuk Celebi
The issue is here: https://issues.apache.org/jira/browse/FLINK-3902

(My "explanation" before dosn't make sense actually and I don't see a
reason why this should be related to having many state handles.)

On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <[hidden email]> wrote:

> Hey Maciek,
>
> thanks for reporting this. Having files linger around looks like a bug to me.
>
> The idea behind having the recursive flag set to false in the
> AbstractFileStateHandle.discardState() call is that the
> FileStateHandle is actually just a single file and not a directory.
> The second call trying to delete the parent directory only succeeds
> when all other files in that directory have been deleted as well. I
> think this is what sometimes fails with many state handles. For
> RocksDB there is only a single state handle, which works well.
>
> I will open an issue for this and try to reproduce it reliably and then fix it.
>
> – Ufuk
>
>
> On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <[hidden email]> wrote:
>> Hi,
>>
>> we have stream job with quite large state (few GB), we're using
>> FSStateBackend and we're storing checkpoints in hdfs.
>> What we observe is that v. often old checkpoints are not discarded properly.
>> In hadoop logs I can see:
>>
>> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates:
>> blk_1084791727_11053122 10.10.113.10:50010
>> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
>> handler 9 on 8020, call
>> org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233
>> Call#12337 Retry#0
>> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
>> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
>> empty': Directory is not empty
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
>>
>> While on flink side (jobmanager log) we don't see any problems:
>> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>> checkpoint 62 @ 1462875622636
>> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 62 (in 9843 ms)
>> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>> checkpoint 63 @ 1462875652637
>> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 63 (in 13909 ms)
>> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>> checkpoint 64 @ 1462875682636
>>
>> I see in the code that delete operations in flink are done with recursive
>> flag set to false - but I'm not sure why the contents are not being deleted
>> before?
>> When we were using RocksDB backed we didn't encounter such situation.
>> we're using flink 1.0.1 and hdfs 2.7.2.
>>
>> Do anybody has any idea why this could be happening?
>>
>> thanks,
>> maciek
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: checkpoints not being removed from HDFS

Maciek Próchniak
thanks,
I'll try to reproduce it in some test by myself...

maciek

On 12/05/2016 18:39, Ufuk Celebi wrote:

> The issue is here: https://issues.apache.org/jira/browse/FLINK-3902
>
> (My "explanation" before dosn't make sense actually and I don't see a
> reason why this should be related to having many state handles.)
>
> On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <[hidden email]> wrote:
>> Hey Maciek,
>>
>> thanks for reporting this. Having files linger around looks like a bug to me.
>>
>> The idea behind having the recursive flag set to false in the
>> AbstractFileStateHandle.discardState() call is that the
>> FileStateHandle is actually just a single file and not a directory.
>> The second call trying to delete the parent directory only succeeds
>> when all other files in that directory have been deleted as well. I
>> think this is what sometimes fails with many state handles. For
>> RocksDB there is only a single state handle, which works well.
>>
>> I will open an issue for this and try to reproduce it reliably and then fix it.
>>
>> – Ufuk
>>
>>
>> On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <[hidden email]> wrote:
>>> Hi,
>>>
>>> we have stream job with quite large state (few GB), we're using
>>> FSStateBackend and we're storing checkpoints in hdfs.
>>> What we observe is that v. often old checkpoints are not discarded properly.
>>> In hadoop logs I can see:
>>>
>>> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates:
>>> blk_1084791727_11053122 10.10.113.10:50010
>>> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
>>> handler 9 on 8020, call
>>> org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233
>>> Call#12337 Retry#0
>>> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
>>> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
>>> empty': Directory is not empty
>>>          at
>>> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
>>>          at
>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
>>>
>>> While on flink side (jobmanager log) we don't see any problems:
>>> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 62 @ 1462875622636
>>> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 62 (in 9843 ms)
>>> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 63 @ 1462875652637
>>> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 63 (in 13909 ms)
>>> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 64 @ 1462875682636
>>>
>>> I see in the code that delete operations in flink are done with recursive
>>> flag set to false - but I'm not sure why the contents are not being deleted
>>> before?
>>> When we were using RocksDB backed we didn't encounter such situation.
>>> we're using flink 1.0.1 and hdfs 2.7.2.
>>>
>>> Do anybody has any idea why this could be happening?
>>>
>>> thanks,
>>> maciek
>>>
>>>
>>>

Reply | Threaded
Open this post in threaded view
|

Re: checkpoints not being removed from HDFS

Maciek Próchniak
Hi Ufuk,

It seems I messed it up a bit :)
I cannot comment on jira, since it's temporarily locked...

1. org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty  - this seems to be expected behaviour, as AbstractFileStateHandle.discardState():

// send a call to delete the checkpoint directory containing the file. This will
// fail (and be ignored) when some files still exist
try {
   getFileSystem().delete(filePath.getParent(), false);
} catch (IOException ignored) {}

- so this is working as expected, although it causes a lot of garbage in hdfs logs...

2. The problem with not discarded checkpoints seems to be related to periods when we don't have any traffic (during night).
At that point many checkpoints "expire before completing":
2016-05-13 00:00:10,585 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 199 @ 1463090410585
2016-05-13 00:10:10,585 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 199 expired before completing.
2016-05-13 00:25:14,650 [flink-akka.actor.default-dispatcher-280300] WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 199

When checkpoint manage to complete they take v. long to do so:
2016-05-13 00:25:19,071 [flink-akka.actor.default-dispatcher-280176] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 201 (in 308472 ms)

- this is happening when no new messages arrive (we have simple process like kafka->keyBy->custom state aggregation->kafka, with EventTime time characteristic)
I think I messed sth up with eventTime & generating watermarks - I'll have to check it.
With RocksDB I made checkpoints at much larger intervals, so probably that's why I haven't noticed the disk is getting full.
OTOH - shouldn't expired checkpoints be cleaned up automatically?


Sorry for confustion and thanks for help

thanks,
maciek


On 12/05/2016 21:28, Maciek Próchniak wrote:
thanks,
I'll try to reproduce it in some test by myself...

maciek

On 12/05/2016 18:39, Ufuk Celebi wrote:
The issue is here: https://issues.apache.org/jira/browse/FLINK-3902

(My "explanation" before dosn't make sense actually and I don't see a
reason why this should be related to having many state handles.)

On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi [hidden email] wrote:
Hey Maciek,

thanks for reporting this. Having files linger around looks like a bug to me.

The idea behind having the recursive flag set to false in the
AbstractFileStateHandle.discardState() call is that the
FileStateHandle is actually just a single file and not a directory.
The second call trying to delete the parent directory only succeeds
when all other files in that directory have been deleted as well. I
think this is what sometimes fails with many state handles. For
RocksDB there is only a single state handle, which works well.

I will open an issue for this and try to reproduce it reliably and then fix it.

– Ufuk


On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak [hidden email] wrote:
Hi,

we have stream job with quite large state (few GB), we're using
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded properly.
In hadoop logs I can see:

2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates:
blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 9 on 8020, call
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233
Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty
         at
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
         at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)

While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 64 @ 1462875682636

I see in the code that delete operations in flink are done with recursive
flag set to false - but I'm not sure why the contents are not being deleted
before?
When we were using RocksDB backed we didn't encounter such situation.
we're using flink 1.0.1 and hdfs 2.7.2.

Do anybody has any idea why this could be happening?

thanks,
maciek