Task manager local state data after crash / recovery

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Task manager local state data after crash / recovery

dhanesh arole
Hey all, 

We are running a stateful stream processing job on k8s using per-job standalone deployment entrypoint. Flink version: 1.12.1

Problem: We have observed that whenever a task manager is either gracefully shut down or killed ( due to OOM, k8s worker node drain out etc ) it doesn't clean up the rocksdb state directories from the local disk. But when the task manager restarts and it receives new task allocation from the resource manager it rebuilds its local state for those tasks from the previous completed checkpoint. Over the period of time after multiple restarts, the task manager's local disk ends up accumulating lots of such orphan rocksdb directories. 

Questions: This isn't causing any functional issues to us, but it adds up lots of repeated ops overhead of cleaning these disks periodically. As a workaround, we are thinking of cleaning the local rocksdb directories except for the taskmanager.state.local.root-dirs before starting the task manager java process. Since, during every task manager restart keyed state backends for allocated tasks are anyway restored we feel it is the safest option atm and will solve our problem of ever growing disk on task manager pods. Is it safe to do so or are there any other consequences of it? Is there any config or restart policy that takes care of cleaning up such stale rocksdb directories during the statebackend restore process?.

A sort of similar clean up is required when local task recovery is enabled. Whenever the task manager is not shut down gracefully the old localState doesn't get cleaned up on the next restart. This also causes lots of disk space wastage. It's easier to delete rocksdb working directories from previou run, but not so straightforward for the localState as one has to figure out which one of them are actually stale allocation IDs and clean only those one. Or check the latest completed checkpoint and delete all localStates directories for older checkpoints and allocation-ids. Is there any other solution to this problem? Also would like to learn from other users how are you handling these operational tasks currently?

configurations:
state.backend.local-recovery: true
taskmanager.state.local.root-dirs: /data/flink/ 
RocksDb backend DB storage path:  /data/flink ( set programmatically )

Dhanesh Arole
Reply | Threaded
Open this post in threaded view
|

Re: Task manager local state data after crash / recovery

Till Rohrmann
Hi Dhanesh,

The way local state works in Flink currently is the following: The user configures a `taskmanager.state.local.root-dirs` or the tmp directory is used where Flink creates a "localState" directory. This is the base directory for all local state. Within this directory a TaskManager creates for every allocation a sub directory using the `AllocationID`. Inside this directory, Flink then stores the local state artefacts.

When Flink frees an allocation, then the corresponding directory is deleted. In case that the process is being killed via a SIGTERM signal, Flink also registers a shut down hook which tries to delete all directories for the known `AllocationIDs`. If the shut down hooks do not run (e.g. killed via SIGKILL), then Flink leaves some residual state.

Now the problem is what happens if the TaskManager process is restarted on the same machine. In this case, Flink will simply use the same local state directory but it ignores existing allocation id sub directories. The reason is that Flink does not know whether these allocation id sub directories are not used by another Flink process running on the same machine. In order to make this decision Flink would have to know that it is the owner of these sub directories. This could work if each TaskManager process is started with a unique ID and if this ID is reused across restart attempts. This is currently not for every deployment the case.

Long story short, it is currently expected that Flink can leave some residual state in case of a hard process stop. Cleaning this state up is at the moment unfortunately the responsibility of the user.

Cheers,
Till

On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole <[hidden email]> wrote:
Hey all, 

We are running a stateful stream processing job on k8s using per-job standalone deployment entrypoint. Flink version: 1.12.1

Problem: We have observed that whenever a task manager is either gracefully shut down or killed ( due to OOM, k8s worker node drain out etc ) it doesn't clean up the rocksdb state directories from the local disk. But when the task manager restarts and it receives new task allocation from the resource manager it rebuilds its local state for those tasks from the previous completed checkpoint. Over the period of time after multiple restarts, the task manager's local disk ends up accumulating lots of such orphan rocksdb directories. 

Questions: This isn't causing any functional issues to us, but it adds up lots of repeated ops overhead of cleaning these disks periodically. As a workaround, we are thinking of cleaning the local rocksdb directories except for the taskmanager.state.local.root-dirs before starting the task manager java process. Since, during every task manager restart keyed state backends for allocated tasks are anyway restored we feel it is the safest option atm and will solve our problem of ever growing disk on task manager pods. Is it safe to do so or are there any other consequences of it? Is there any config or restart policy that takes care of cleaning up such stale rocksdb directories during the statebackend restore process?.

A sort of similar clean up is required when local task recovery is enabled. Whenever the task manager is not shut down gracefully the old localState doesn't get cleaned up on the next restart. This also causes lots of disk space wastage. It's easier to delete rocksdb working directories from previou run, but not so straightforward for the localState as one has to figure out which one of them are actually stale allocation IDs and clean only those one. Or check the latest completed checkpoint and delete all localStates directories for older checkpoints and allocation-ids. Is there any other solution to this problem? Also would like to learn from other users how are you handling these operational tasks currently?

configurations:
state.backend.local-recovery: true
taskmanager.state.local.root-dirs: /data/flink/ 
RocksDb backend DB storage path:  /data/flink ( set programmatically )

Dhanesh Arole
Reply | Threaded
Open this post in threaded view
|

Re: Task manager local state data after crash / recovery

dhanesh arole
Thanks a lot for answering it in detail. This makes sense and cleared lots of doubt. 

On Fri, 9 Apr 2021 at 13:02 Till Rohrmann <[hidden email]> wrote:
Hi Dhanesh,

The way local state works in Flink currently is the following: The user configures a `taskmanager.state.local.root-dirs` or the tmp directory is used where Flink creates a "localState" directory. This is the base directory for all local state. Within this directory a TaskManager creates for every allocation a sub directory using the `AllocationID`. Inside this directory, Flink then stores the local state artefacts.

When Flink frees an allocation, then the corresponding directory is deleted. In case that the process is being killed via a SIGTERM signal, Flink also registers a shut down hook which tries to delete all directories for the known `AllocationIDs`. If the shut down hooks do not run (e.g. killed via SIGKILL), then Flink leaves some residual state.

Now the problem is what happens if the TaskManager process is restarted on the same machine. In this case, Flink will simply use the same local state directory but it ignores existing allocation id sub directories. The reason is that Flink does not know whether these allocation id sub directories are not used by another Flink process running on the same machine. In order to make this decision Flink would have to know that it is the owner of these sub directories. This could work if each TaskManager process is started with a unique ID and if this ID is reused across restart attempts. This is currently not for every deployment the case.

Long story short, it is currently expected that Flink can leave some residual state in case of a hard process stop. Cleaning this state up is at the moment unfortunately the responsibility of the user.

Cheers,
Till

On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole <[hidden email]> wrote:
Hey all, 

We are running a stateful stream processing job on k8s using per-job standalone deployment entrypoint. Flink version: 1.12.1

Problem: We have observed that whenever a task manager is either gracefully shut down or killed ( due to OOM, k8s worker node drain out etc ) it doesn't clean up the rocksdb state directories from the local disk. But when the task manager restarts and it receives new task allocation from the resource manager it rebuilds its local state for those tasks from the previous completed checkpoint. Over the period of time after multiple restarts, the task manager's local disk ends up accumulating lots of such orphan rocksdb directories. 

Questions: This isn't causing any functional issues to us, but it adds up lots of repeated ops overhead of cleaning these disks periodically. As a workaround, we are thinking of cleaning the local rocksdb directories except for the taskmanager.state.local.root-dirs before starting the task manager java process. Since, during every task manager restart keyed state backends for allocated tasks are anyway restored we feel it is the safest option atm and will solve our problem of ever growing disk on task manager pods. Is it safe to do so or are there any other consequences of it? Is there any config or restart policy that takes care of cleaning up such stale rocksdb directories during the statebackend restore process?.

A sort of similar clean up is required when local task recovery is enabled. Whenever the task manager is not shut down gracefully the old localState doesn't get cleaned up on the next restart. This also causes lots of disk space wastage. It's easier to delete rocksdb working directories from previou run, but not so straightforward for the localState as one has to figure out which one of them are actually stale allocation IDs and clean only those one. Or check the latest completed checkpoint and delete all localStates directories for older checkpoints and allocation-ids. Is there any other solution to this problem? Also would like to learn from other users how are you handling these operational tasks currently?

configurations:
state.backend.local-recovery: true
taskmanager.state.local.root-dirs: /data/flink/ 
RocksDb backend DB storage path:  /data/flink ( set programmatically )

Dhanesh Arole
--
- Dhanesh ( sent from my mobile device. Pardon me for any typos )