Hello,
We've been experimenting with Task-local recovery using Kubernetes. We have a way to specify mounting the same disk across Task Manager restarts/deletions for when the pods get recreated. In this scenario, we noticed that task local recovery does not kick in
(as expected based on the documentation).
We did try to comment out the code on the shutdown path which cleaned up the task local directories before the pod went down / was restarted. We noticed that remote recovery kicked in even though the task local state was present. I noticed that the slot IDs
changed, and was wondering if this is the main reason that the task local state didn't get used in this scenario?
Since we're using this shared disk to store the local state across pod failures, would it make sense to allow keeping the task local state so that we can
get faster recovery even for situations where the Task Manager itself dies? In some sense, the storage here is disaggregated from the pods and can potentially benefit from task local recovery. Any reason why this is a bad idea in general?
Is there a way to preserve the slot IDs across restarts? We setup the Task Manager to pin the resource-id, but that didn't seem to help. My understanding
is that the slot ID needs to be reused for task local recovery to kick in.
Thanks,
Sonam
|
/cc dev@flink On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal <[hidden email]> wrote:
|
Hi Sonam, sorry for the late reply. We were a bit caught in the midst of the feature freeze for the next major Flink release. In general, I think it is a very good idea to disaggregate the local state storage to make it reusable across TaskManager failures. However, it is also not trivial to do. Maybe let me first describe how the current task local recovery works and then see how we could improve it: Flink creates for every slot allocation an AllocationID. The AllocationID associates a slot on a TaskExecutor with a job and is also used for scoping the lifetime of a slot wrt a job (theoretically, one and the same slot could be used to fulfill multiple slot requests of the same job if the slot allocation is freed in between). Note that the AllocationID is a random ID and, thus, changes whenever the ResourceManager allocates a new slot on a TaskExecutor for a job. Task local recovery is effectively a state cache which is associated with an AllocationID. So for every checkpoint and every task, a TaskExecutor copies the state data and stores them in the task local recovery cache. The cache is maintained as long as the slot allocation is valid (e.g. the slot has not been freed by the JobMaster and the slot has not timed out). This makes the lifecycle management of the state data quite easy and makes sure that a process does not clutter local disks. On the JobMaster side, Flink remembers for every Execution, where it is deployed (it remembers the AllocationID). If a failover happens, then Flink tries to re-deploy the Executions into the slots they were running in before by matching the AllocationIDs. The reason why we scoped the state cache to an AllocationID was for simplicity and because we couldn't guarantee that a failed TaskExecutor X will be restarted on the same machine again and thereby having access to the same local disk as before. That's also why Flink deletes the cache directory when a slot is freed or when the TaskExecutor is shut down gracefully. With persistent volumes this changes and we can make the TaskExecutors "stateful" in the sense that we can reuse an already occupied cache. One rather simple idea could be to also persist the slot allocations of a TaskExecutor (which slot is allocated and what is its assigned AllocationID). This information could be used to re-initialize the TaskExecutor upon restart. That way, it does not have to register at the ResourceManager and wait for new slot allocations but could directly start offering its slots to the jobs it remembered. If the TaskExecutor cannot find the JobMasters for the respective jobs, it would then free the slots and clear the cache accordingly. This could work as long as the ResourceManager does not start new TaskExecutors whose slots could be used to recover the job. If this is a problem, then one needs to answer the question how long to wait for the old TaskExecutors to come back and reusing their local state vs. starting quickly a fresh instance but having to restore state remotely. An alternative solution proposal which is probably more powerful albeit also more complex would be to make the cache information explicit when registering the TaskExecutor at the ResourceManager and later offering slots to the JobMaster. For example, the TaskExecutor could tell the ResourceManager which states it has locally cached (it probably needs to contain key group ranges for every stored state) and this information could be used to decide from which TaskExecutor to allocate slots for a job. Similarly on the JobMaster side we could use this information to calculate the best mapping between Executions and slots. I think that mechanism could better deal with rescaling events where there is no perfect match between Executions and slots because of the changed key group ranges. So to answer your question: There is currently no way to preserve AllocationIDs across restarts. However, we could use the persistent volume to store this information so that we can restore it on restart of a TaskExecutor. This could enable task local state recovery for cases where we lose a TaskExecutor and restart it with the same persistent volume. Cheers, Till On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen <[hidden email]> wrote: /cc dev@flink |
Hi Till,
Thanks for getting back to me. Apologies for my delayed response.
Thanks for confirming that the slot ID (Allocation ID) is indeed necessary today for task local recovery to kick in, and thanks for your insights on how to make this work.
We are interested in exploring this disaggregation between local state storage and slots to allow potential reuse of local state even when TMs go down.
I'm planning to spend some time exploring the Flink code around local recovery and state persistence. I'm still new to Flink, so any guidance will be helpful. I
think both of your ideas on how to make this happen are interesting and worth exploring. What's the procedure to collaborate or get guidance on this feature? Will a FLIP be required, or will opening a ticket do?
Thanks,
Sonam
From: Till Rohrmann <[hidden email]>
Sent: Monday, April 26, 2021 10:24 AM To: dev <[hidden email]> Cc: [hidden email] <[hidden email]>; Sonam Mandal <[hidden email]> Subject: Re: Task Local Recovery with mountable disks in the cloud Hi Sonam,
sorry for the late reply. We were a bit caught in the midst of the feature freeze for the next major Flink release.
In general, I think it is a very good idea to disaggregate the local state storage to make it reusable across TaskManager failures. However, it is also not trivial to do.
Maybe let me first describe how the current task local recovery works and then see how we could improve it:
Flink creates for every slot allocation an AllocationID. The AllocationID associates a slot on a TaskExecutor with a job and is also used for scoping the lifetime of a slot wrt a job (theoretically, one and the same slot could be used to fulfill multiple
slot requests of the same job if the slot allocation is freed in between). Note that the AllocationID is a random ID and, thus, changes whenever the ResourceManager allocates a new slot on a TaskExecutor for a job.
Task local recovery is effectively a state cache which is associated with an AllocationID. So for every checkpoint and every task, a TaskExecutor copies the state data and stores them in the task local recovery cache. The cache is maintained as long as
the slot allocation is valid (e.g. the slot has not been freed by the JobMaster and the slot has not timed out). This makes the lifecycle management of the state data quite easy and makes sure that a process does not clutter local disks. On the JobMaster side,
Flink remembers for every Execution, where it is deployed (it remembers the AllocationID). If a failover happens, then Flink tries to re-deploy the Executions into the slots they were running in before by matching the AllocationIDs.
The reason why we scoped the state cache to an AllocationID was for simplicity and because we couldn't guarantee that a failed TaskExecutor X will be restarted on the same machine again and thereby having access to the same local disk as before. That's
also why Flink deletes the cache directory when a slot is freed or when the TaskExecutor is shut down gracefully.
With persistent volumes this changes and we can make the TaskExecutors "stateful" in the sense that we can reuse an already occupied cache. One rather simple idea could be to also persist the slot allocations of a TaskExecutor (which slot is allocated
and what is its assigned AllocationID). This information could be used to re-initialize the TaskExecutor upon restart. That way, it does not have to register at the ResourceManager and wait for new slot allocations but could directly start offering its slots
to the jobs it remembered. If the TaskExecutor cannot find the JobMasters for the respective jobs, it would then free the slots and clear the cache accordingly.
This could work as long as the ResourceManager does not start new TaskExecutors whose slots could be used to recover the job. If this is a problem, then one needs to answer the question how long to wait for the old TaskExecutors to come back and reusing
their local state vs. starting quickly a fresh instance but having to restore state remotely.
An alternative solution proposal which is probably more powerful albeit also more complex would be to make the cache information explicit when registering the TaskExecutor at the ResourceManager and later offering slots to the JobMaster. For example, the
TaskExecutor could tell the ResourceManager which states it has locally cached (it probably needs to contain key group ranges for every stored state) and this information could be used to decide from which TaskExecutor to allocate slots for a job. Similarly
on the JobMaster side we could use this information to calculate the best mapping between Executions and slots. I think that mechanism could better deal with rescaling events where there is no perfect match between Executions and slots because of the changed
key group ranges.
So to answer your question: There is currently no way to preserve AllocationIDs across restarts. However, we could use the persistent volume to store this information so that we can restore it on restart of a TaskExecutor. This could enable task local
state recovery for cases where we lose a TaskExecutor and restart it with the same persistent volume.
Cheers,
Till
On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen <[hidden email]> wrote:
/cc dev@flink |
Hi Sonam, I think it would be great to create a FLIP for this feature. FLIPs don't have to be super large and in this case, I could see it work to express the general idea to make local recovery work across TaskManager failures and then outline the different ideas we had so far. If we then decide to go with the persisting of cache information (the AllocationIDs), then this could be a good outcome. If we decide to go with the more complex solution of telling the ResourceManager and JobMaster about the ranges of cached state data, then this is also ok. Cheers, Till On Fri, May 7, 2021 at 6:30 PM Sonam Mandal <[hidden email]> wrote:
|
Hi Till,
Sure, that sounds good. I'll open a FLIP for this when we start working on it.
Thanks for the insights!
Regards,
Sonam
From: Till Rohrmann <[hidden email]>
Sent: Monday, May 10, 2021 2:26 AM To: Sonam Mandal <[hidden email]> Cc: dev <[hidden email]>; [hidden email] <[hidden email]> Subject: Re: Task Local Recovery with mountable disks in the cloud Hi Sonam,
I think it would be great to create a FLIP for this feature. FLIPs don't have to be super large and in this case, I could see it work to express the general idea to make local recovery work across TaskManager failures and then outline the different ideas
we had so far. If we then decide to go with the persisting of cache information (the AllocationIDs), then this could be a good outcome. If we decide to go with the more complex solution of telling the ResourceManager and JobMaster about the ranges of cached
state data, then this is also ok.
Cheers,
Till
On Fri, May 7, 2021 at 6:30 PM Sonam Mandal <[hidden email]> wrote:
|
Just a side input, not only the persistent volume could help with keeping the local state for the TaskManager pod, but also the ephemeral storage. Ephemeral storage is bound to the lifecycle of TaskManager pod. And it could be shared between different restarts of TaskManager container. Best, Yang Sonam Mandal <[hidden email]> 于2021年5月11日周二 上午1:02写道:
|
Free forum by Nabble | Edit this page |