CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Laura Uzcátegui
Hello, 

 At work, we are currently standing up a cluster with the following configuration: 

  • Flink version: 1.4.2 
  • HA Enabled with Zookeeper 
  • State backend : rocksDB 
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/checkpoints
  • high-availability.storageDir: hdfs://namenode:9000/flink/recovery
We have also a job running with checkpointing enabled and without externalized checkpoint. 

We run this job multiple times a day since it's run from our integration-test pipeline, and we started noticing the folder high-availability.storageDir  storing the completedCheckpoint files is increasing constantly the number of files created, which is making us wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of completedCheckpoint files on the HA storage dir when there is only a single job running over and over again ? 

Here is a list of what we are seeing accumulating over time and actually reaching the maximum of files allowed on the Filesystem. 

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2

Cheers, 

Laura U.
Reply | Threaded
Open this post in threaded view
|

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

vino yang
Hi Laura,

First of all, Flink only keeps one completed checkpoint by default[1]. You need to confirm whether your configuration is consistent with the number of files. If they are consistent, it is for other reasons:

1) The cleaning of the completed checkpoint is done by JM. You need to confirm whether it can access your file.[2]
2) JM will asynchronously clean up the metadata of the old completed checkpoint on the ZK with a background thread. After the cleanup is successful, it will clean the Checkpoint data. If the above reasons are excluded, then maybe you provide JM's log to help us confirm whether this is the reason. I think it is more appropriate to ping Till.[3]


Thanks, vino.

Laura Uzcátegui <[hidden email]> 于2018年8月30日周四 下午10:52写道:
Hello, 

 At work, we are currently standing up a cluster with the following configuration: 

  • Flink version: 1.4.2 
  • HA Enabled with Zookeeper 
  • State backend : rocksDB 
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/checkpoints
  • high-availability.storageDir: hdfs://namenode:9000/flink/recovery
We have also a job running with checkpointing enabled and without externalized checkpoint. 

We run this job multiple times a day since it's run from our integration-test pipeline, and we started noticing the folder high-availability.storageDir  storing the completedCheckpoint files is increasing constantly the number of files created, which is making us wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of completedCheckpoint files on the HA storage dir when there is only a single job running over and over again ? 

Here is a list of what we are seeing accumulating over time and actually reaching the maximum of files allowed on the Filesystem. 

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2

Cheers, 

Laura U.
Reply | Threaded
Open this post in threaded view
|

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Stephan Ewen
Hi Laura!

Vino had good pointers. There really should be no case in which this is not cleaned up.

Is this a bounded job that ends? Is it always the last of the bounded job's checkpoints that remains?

Best,
Stephan


On Fri, Aug 31, 2018 at 5:02 AM, vino yang <[hidden email]> wrote:
Hi Laura,

First of all, Flink only keeps one completed checkpoint by default[1]. You need to confirm whether your configuration is consistent with the number of files. If they are consistent, it is for other reasons:

1) The cleaning of the completed checkpoint is done by JM. You need to confirm whether it can access your file.[2]
2) JM will asynchronously clean up the metadata of the old completed checkpoint on the ZK with a background thread. After the cleanup is successful, it will clean the Checkpoint data. If the above reasons are excluded, then maybe you provide JM's log to help us confirm whether this is the reason. I think it is more appropriate to ping Till.[3]


Thanks, vino.

Laura Uzcátegui <[hidden email]> 于2018年8月30日周四 下午10:52写道:
Hello, 

 At work, we are currently standing up a cluster with the following configuration: 

  • Flink version: 1.4.2 
  • HA Enabled with Zookeeper 
  • State backend : rocksDB 
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/checkpoints
  • high-availability.storageDir: hdfs://namenode:9000/flink/recovery
We have also a job running with checkpointing enabled and without externalized checkpoint. 

We run this job multiple times a day since it's run from our integration-test pipeline, and we started noticing the folder high-availability.storageDir  storing the completedCheckpoint files is increasing constantly the number of files created, which is making us wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of completedCheckpoint files on the HA storage dir when there is only a single job running over and over again ? 

Here is a list of what we are seeing accumulating over time and actually reaching the maximum of files allowed on the Filesystem. 

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2

Cheers, 

Laura U.

Reply | Threaded
Open this post in threaded view
|

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Laura Uzcátegui
Hi Stephan and Vino, 

Thanks for the quick reply and hints.

The configuration for the checkpoints that should remain is set to 1. 

Since this is a unbounded job run and I can't see it finishing, I suspect as we tear down the cluster every time we finish with the integration test being run, the completedCheckpoint doesn't get deleted, next when the integration test runs again it picks up from the latest completedCheckpoint but there is cases where that job doesn't run again therefore the completedCheckpoint gets staled. Is this something that could happen? 

Is there anyway to check by logging wether the job gets to Global Final State before we tear down the cluster? 

Cheers, 

Laura 

On Fri, 31 Aug 2018, 8:37 am Stephan Ewen, <[hidden email]> wrote:
Hi Laura!

Vino had good pointers. There really should be no case in which this is not cleaned up.

Is this a bounded job that ends? Is it always the last of the bounded job's checkpoints that remains?

Best,
Stephan


On Fri, Aug 31, 2018 at 5:02 AM, vino yang <[hidden email]> wrote:
Hi Laura,

First of all, Flink only keeps one completed checkpoint by default[1]. You need to confirm whether your configuration is consistent with the number of files. If they are consistent, it is for other reasons:

1) The cleaning of the completed checkpoint is done by JM. You need to confirm whether it can access your file.[2]
2) JM will asynchronously clean up the metadata of the old completed checkpoint on the ZK with a background thread. After the cleanup is successful, it will clean the Checkpoint data. If the above reasons are excluded, then maybe you provide JM's log to help us confirm whether this is the reason. I think it is more appropriate to ping Till.[3]


Thanks, vino.

Laura Uzcátegui <[hidden email]> 于2018年8月30日周四 下午10:52写道:
Hello, 

 At work, we are currently standing up a cluster with the following configuration: 

  • Flink version: 1.4.2 
  • HA Enabled with Zookeeper 
  • State backend : rocksDB 
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/checkpoints
  • high-availability.storageDir: hdfs://namenode:9000/flink/recovery
We have also a job running with checkpointing enabled and without externalized checkpoint. 

We run this job multiple times a day since it's run from our integration-test pipeline, and we started noticing the folder high-availability.storageDir  storing the completedCheckpoint files is increasing constantly the number of files created, which is making us wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of completedCheckpoint files on the HA storage dir when there is only a single job running over and over again ? 

Here is a list of what we are seeing accumulating over time and actually reaching the maximum of files allowed on the Filesystem. 

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2

Cheers, 

Laura U.

Reply | Threaded
Open this post in threaded view
|

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

vino yang
Hi Laura:

Perhaps this is possible because the path to the completed checkpoint on HDFS does not have a hierarchical relationship to identify which job it belongs to, it is just a fixed prefix plus a random string generated name. My personal advice:

1) Verify it with a clean cluster (clean up the metadata that Flink/Zookeeper/HDFS might confuse);
2) Verify the node and metadata information (/checkpoints/${jobID}/) on ZooKeeper;
3) Observe whether there is relevant abnormal information in the log;

Thanks, vino.

Laura Uzcátegui <[hidden email]> 于2018年8月31日周五 下午3:51写道:
Hi Stephan and Vino, 

Thanks for the quick reply and hints.

The configuration for the checkpoints that should remain is set to 1. 

Since this is a unbounded job run and I can't see it finishing, I suspect as we tear down the cluster every time we finish with the integration test being run, the completedCheckpoint doesn't get deleted, next when the integration test runs again it picks up from the latest completedCheckpoint but there is cases where that job doesn't run again therefore the completedCheckpoint gets staled. Is this something that could happen? 

Is there anyway to check by logging wether the job gets to Global Final State before we tear down the cluster? 

Cheers, 

Laura 

On Fri, 31 Aug 2018, 8:37 am Stephan Ewen, <[hidden email]> wrote:
Hi Laura!

Vino had good pointers. There really should be no case in which this is not cleaned up.

Is this a bounded job that ends? Is it always the last of the bounded job's checkpoints that remains?

Best,
Stephan


On Fri, Aug 31, 2018 at 5:02 AM, vino yang <[hidden email]> wrote:
Hi Laura,

First of all, Flink only keeps one completed checkpoint by default[1]. You need to confirm whether your configuration is consistent with the number of files. If they are consistent, it is for other reasons:

1) The cleaning of the completed checkpoint is done by JM. You need to confirm whether it can access your file.[2]
2) JM will asynchronously clean up the metadata of the old completed checkpoint on the ZK with a background thread. After the cleanup is successful, it will clean the Checkpoint data. If the above reasons are excluded, then maybe you provide JM's log to help us confirm whether this is the reason. I think it is more appropriate to ping Till.[3]


Thanks, vino.

Laura Uzcátegui <[hidden email]> 于2018年8月30日周四 下午10:52写道:
Hello, 

 At work, we are currently standing up a cluster with the following configuration: 

  • Flink version: 1.4.2 
  • HA Enabled with Zookeeper 
  • State backend : rocksDB 
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/checkpoints
  • high-availability.storageDir: hdfs://namenode:9000/flink/recovery
We have also a job running with checkpointing enabled and without externalized checkpoint. 

We run this job multiple times a day since it's run from our integration-test pipeline, and we started noticing the folder high-availability.storageDir  storing the completedCheckpoint files is increasing constantly the number of files created, which is making us wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of completedCheckpoint files on the HA storage dir when there is only a single job running over and over again ? 

Here is a list of what we are seeing accumulating over time and actually reaching the maximum of files allowed on the Filesystem. 

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2

Cheers, 

Laura U.

Reply | Threaded
Open this post in threaded view
|

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Stephan Ewen
One final thought: How to you stop the unbounded streaming application?

If you just kill the Yarn/Mesos/K8s cluster, Flink will not know that this is a shutdown, and interpret it as a failure. Because of that, checkpoints will remain (in DFS and in ZooKeeper).

On Fri, Aug 31, 2018 at 2:18 PM, vino yang <[hidden email]> wrote:
Hi Laura:

Perhaps this is possible because the path to the completed checkpoint on HDFS does not have a hierarchical relationship to identify which job it belongs to, it is just a fixed prefix plus a random string generated name. My personal advice:

1) Verify it with a clean cluster (clean up the metadata that Flink/Zookeeper/HDFS might confuse);
2) Verify the node and metadata information (/checkpoints/${jobID}/) on ZooKeeper;
3) Observe whether there is relevant abnormal information in the log;

Thanks, vino.

Laura Uzcátegui <[hidden email]> 于2018年8月31日周五 下午3:51写道:
Hi Stephan and Vino, 

Thanks for the quick reply and hints.

The configuration for the checkpoints that should remain is set to 1. 

Since this is a unbounded job run and I can't see it finishing, I suspect as we tear down the cluster every time we finish with the integration test being run, the completedCheckpoint doesn't get deleted, next when the integration test runs again it picks up from the latest completedCheckpoint but there is cases where that job doesn't run again therefore the completedCheckpoint gets staled. Is this something that could happen? 

Is there anyway to check by logging wether the job gets to Global Final State before we tear down the cluster? 

Cheers, 

Laura 

On Fri, 31 Aug 2018, 8:37 am Stephan Ewen, <[hidden email]> wrote:
Hi Laura!

Vino had good pointers. There really should be no case in which this is not cleaned up.

Is this a bounded job that ends? Is it always the last of the bounded job's checkpoints that remains?

Best,
Stephan


On Fri, Aug 31, 2018 at 5:02 AM, vino yang <[hidden email]> wrote:
Hi Laura,

First of all, Flink only keeps one completed checkpoint by default[1]. You need to confirm whether your configuration is consistent with the number of files. If they are consistent, it is for other reasons:

1) The cleaning of the completed checkpoint is done by JM. You need to confirm whether it can access your file.[2]
2) JM will asynchronously clean up the metadata of the old completed checkpoint on the ZK with a background thread. After the cleanup is successful, it will clean the Checkpoint data. If the above reasons are excluded, then maybe you provide JM's log to help us confirm whether this is the reason. I think it is more appropriate to ping Till.[3]


Thanks, vino.

Laura Uzcátegui <[hidden email]> 于2018年8月30日周四 下午10:52写道:
Hello, 

 At work, we are currently standing up a cluster with the following configuration: 

  • Flink version: 1.4.2 
  • HA Enabled with Zookeeper 
  • State backend : rocksDB 
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/checkpoints
  • high-availability.storageDir: hdfs://namenode:9000/flink/recovery
We have also a job running with checkpointing enabled and without externalized checkpoint. 

We run this job multiple times a day since it's run from our integration-test pipeline, and we started noticing the folder high-availability.storageDir  storing the completedCheckpoint files is increasing constantly the number of files created, which is making us wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of completedCheckpoint files on the HA storage dir when there is only a single job running over and over again ? 

Here is a list of what we are seeing accumulating over time and actually reaching the maximum of files allowed on the Filesystem. 

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2

Cheers, 

Laura U.