How to know if task-local recovery kicked in for some nodes?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

How to know if task-local recovery kicked in for some nodes?

Sonam Mandal
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

Tzu-Li (Gordon) Tai
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

Till Rohrmann
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the recovery time. Apart from that you can also look for "Found registered local state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged on debug. This indicates that the local state is available. However, it does not say whether it is actually used. E.g. when doing a rescaling operation we change the assignment of key group ranges which prevents local state from being used. However in case of a recovery the above-mentioned log message should indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

dhanesh arole
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod gets killed and restarts again ( i.e. the entire task manager process restarts ) then local recovery doesn't happen. Task manager restore process actually downloads the latest completed checkpoint from the remote state handle even when the older localState data is available. This happens because with every run allocation-ids for tasks running on task manager change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data kicks in when the task manager process is alive but due to some other reason ( like timeout from sink or external dependency ) one of the tasks fails and the flink job gets restarted by the job manager. 

Please CMIIW


Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann <[hidden email]> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the recovery time. Apart from that you can also look for "Found registered local state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged on debug. This indicates that the local state is available. However, it does not say whether it is actually used. E.g. when doing a rescaling operation we change the assignment of key group ranges which prevents local state from being used. However in case of a recovery the above-mentioned log message should indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

Till Rohrmann
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try to redeploy tasks onto them also in case of a global failover. Only those tasks which have been executed on the lost TaskManager will need new slots and have to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole <[hidden email]> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod gets killed and restarts again ( i.e. the entire task manager process restarts ) then local recovery doesn't happen. Task manager restore process actually downloads the latest completed checkpoint from the remote state handle even when the older localState data is available. This happens because with every run allocation-ids for tasks running on task manager change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data kicks in when the task manager process is alive but due to some other reason ( like timeout from sink or external dependency ) one of the tasks fails and the flink job gets restarted by the job manager. 

Please CMIIW


Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann <[hidden email]> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the recovery time. Apart from that you can also look for "Found registered local state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged on debug. This indicates that the local state is available. However, it does not say whether it is actually used. E.g. when doing a rescaling operation we change the assignment of key group ranges which prevents local state from being used. However in case of a recovery the above-mentioned log message should indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

dhanesh arole
Hi Till, 

You are right. To give you more context about our setup, we are running 1 task slot per task manager and total number of task manager replicas equal to job parallelism. The issue actually exacerbates during rolling deployment of task managers as each TM goes offline and comes back online again after some time. So during bouncing of every TM pod somehow task allocation changes and finally job stabilises once all TMs are restarted.  Maybe a proper blue green setup would allow us to make the best use of local recovery during restart of TMs. But during intermittent failures of one of the TMs local recovery works as expected on the other healthy TM instances ( I.e it does not download from remote ). 

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann <[hidden email]> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try to redeploy tasks onto them also in case of a global failover. Only those tasks which have been executed on the lost TaskManager will need new slots and have to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole <[hidden email]> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod gets killed and restarts again ( i.e. the entire task manager process restarts ) then local recovery doesn't happen. Task manager restore process actually downloads the latest completed checkpoint from the remote state handle even when the older localState data is available. This happens because with every run allocation-ids for tasks running on task manager change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data kicks in when the task manager process is alive but due to some other reason ( like timeout from sink or external dependency ) one of the tasks fails and the flink job gets restarted by the job manager. 

Please CMIIW


Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann <[hidden email]> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the recovery time. Apart from that you can also look for "Found registered local state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged on debug. This indicates that the local state is available. However, it does not say whether it is actually used. E.g. when doing a rescaling operation we change the assignment of key group ranges which prevents local state from being used. However in case of a recovery the above-mentioned log message should indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
--
- Dhanesh ( sent from my mobile device. Pardon me for any typos )
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

Sonam Mandal
Hi Till and Dhanesh,

Thanks for the insights into both on how to check that this kicks in and on the expected behavior. My understanding too was that if multiple TMs are used for the job, any TMs that don’t go down can take advantage of local recovery.

Do you have any insights on a good minimum state size we should experiment with to check recovery time differences between the two modes?

Thanks,
Sonam 

From: dhanesh arole <[hidden email]>
Sent: Wednesday, April 7, 2021 3:43:11 AM
To: Till Rohrmann <[hidden email]>
Cc: Sonam Mandal <[hidden email]>; Tzu-Li (Gordon) Tai <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: How to know if task-local recovery kicked in for some nodes?
 
Hi Till, 

You are right. To give you more context about our setup, we are running 1 task slot per task manager and total number of task manager replicas equal to job parallelism. The issue actually exacerbates during rolling deployment of task managers as each TM goes offline and comes back online again after some time. So during bouncing of every TM pod somehow task allocation changes and finally job stabilises once all TMs are restarted.  Maybe a proper blue green setup would allow us to make the best use of local recovery during restart of TMs. But during intermittent failures of one of the TMs local recovery works as expected on the other healthy TM instances ( I.e it does not download from remote ). 

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann <[hidden email]> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try to redeploy tasks onto them also in case of a global failover. Only those tasks which have been executed on the lost TaskManager will need new slots and have to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole <[hidden email]> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod gets killed and restarts again ( i.e. the entire task manager process restarts ) then local recovery doesn't happen. Task manager restore process actually downloads the latest completed checkpoint from the remote state handle even when the older localState data is available. This happens because with every run allocation-ids for tasks running on task manager change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data kicks in when the task manager process is alive but due to some other reason ( like timeout from sink or external dependency ) one of the tasks fails and the flink job gets restarted by the job manager. 

Please CMIIW


Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann <[hidden email]> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the recovery time. Apart from that you can also look for "Found registered local state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged on debug. This indicates that the local state is available. However, it does not say whether it is actually used. E.g. when doing a rescaling operation we change the assignment of key group ranges which prevents local state from being used. However in case of a recovery the above-mentioned log message should indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
--
- Dhanesh ( sent from my mobile device. Pardon me for any typos )
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

Till Rohrmann
Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you have 1 GBps network connection and local SSDs, then I guess you should see a difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal <[hidden email]> wrote:
Hi Till and Dhanesh,

Thanks for the insights into both on how to check that this kicks in and on the expected behavior. My understanding too was that if multiple TMs are used for the job, any TMs that don’t go down can take advantage of local recovery.

Do you have any insights on a good minimum state size we should experiment with to check recovery time differences between the two modes?

Thanks,
Sonam 

From: dhanesh arole <[hidden email]>
Sent: Wednesday, April 7, 2021 3:43:11 AM
To: Till Rohrmann <[hidden email]>
Cc: Sonam Mandal <[hidden email]>; Tzu-Li (Gordon) Tai <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: How to know if task-local recovery kicked in for some nodes?
 
Hi Till, 

You are right. To give you more context about our setup, we are running 1 task slot per task manager and total number of task manager replicas equal to job parallelism. The issue actually exacerbates during rolling deployment of task managers as each TM goes offline and comes back online again after some time. So during bouncing of every TM pod somehow task allocation changes and finally job stabilises once all TMs are restarted.  Maybe a proper blue green setup would allow us to make the best use of local recovery during restart of TMs. But during intermittent failures of one of the TMs local recovery works as expected on the other healthy TM instances ( I.e it does not download from remote ). 

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann <[hidden email]> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try to redeploy tasks onto them also in case of a global failover. Only those tasks which have been executed on the lost TaskManager will need new slots and have to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole <[hidden email]> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod gets killed and restarts again ( i.e. the entire task manager process restarts ) then local recovery doesn't happen. Task manager restore process actually downloads the latest completed checkpoint from the remote state handle even when the older localState data is available. This happens because with every run allocation-ids for tasks running on task manager change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data kicks in when the task manager process is alive but due to some other reason ( like timeout from sink or external dependency ) one of the tasks fails and the flink job gets restarted by the job manager. 

Please CMIIW


Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann <[hidden email]> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the recovery time. Apart from that you can also look for "Found registered local state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged on debug. This indicates that the local state is available. However, it does not say whether it is actually used. E.g. when doing a rescaling operation we change the assignment of key group ranges which prevents local state from being used. However in case of a recovery the above-mentioned log message should indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
--
- Dhanesh ( sent from my mobile device. Pardon me for any typos )
Reply | Threaded
Open this post in threaded view
|

Re: How to know if task-local recovery kicked in for some nodes?

Sonam Mandal
Hi Till,

Got it, that definitely makes sense, was just looking for some ballpark number to start with. Appreciate your help!

Thanks,
Sonam

From: Till Rohrmann <[hidden email]>
Sent: Monday, April 12, 2021 1:00 AM
To: Sonam Mandal <[hidden email]>
Cc: dhanesh arole <[hidden email]>; Tzu-Li (Gordon) Tai <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: How to know if task-local recovery kicked in for some nodes?
 
Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you have 1 GBps network connection and local SSDs, then I guess you should see a difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal <[hidden email]> wrote:
Hi Till and Dhanesh,

Thanks for the insights into both on how to check that this kicks in and on the expected behavior. My understanding too was that if multiple TMs are used for the job, any TMs that don’t go down can take advantage of local recovery.

Do you have any insights on a good minimum state size we should experiment with to check recovery time differences between the two modes?

Thanks,
Sonam 

From: dhanesh arole <[hidden email]>
Sent: Wednesday, April 7, 2021 3:43:11 AM
To: Till Rohrmann <[hidden email]>
Cc: Sonam Mandal <[hidden email]>; Tzu-Li (Gordon) Tai <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: How to know if task-local recovery kicked in for some nodes?
 
Hi Till, 

You are right. To give you more context about our setup, we are running 1 task slot per task manager and total number of task manager replicas equal to job parallelism. The issue actually exacerbates during rolling deployment of task managers as each TM goes offline and comes back online again after some time. So during bouncing of every TM pod somehow task allocation changes and finally job stabilises once all TMs are restarted.  Maybe a proper blue green setup would allow us to make the best use of local recovery during restart of TMs. But during intermittent failures of one of the TMs local recovery works as expected on the other healthy TM instances ( I.e it does not download from remote ). 

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann <[hidden email]> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try to redeploy tasks onto them also in case of a global failover. Only those tasks which have been executed on the lost TaskManager will need new slots and have to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole <[hidden email]> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod gets killed and restarts again ( i.e. the entire task manager process restarts ) then local recovery doesn't happen. Task manager restore process actually downloads the latest completed checkpoint from the remote state handle even when the older localState data is available. This happens because with every run allocation-ids for tasks running on task manager change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data kicks in when the task manager process is alive but due to some other reason ( like timeout from sink or external dependency ) one of the tasks fails and the flink job gets restarted by the job manager. 

Please CMIIW


Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann <[hidden email]> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the recovery time. Apart from that you can also look for "Found registered local state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged on debug. This indicates that the local state is available. However, it does not say whether it is actually used. E.g. when doing a rescaling operation we change the assignment of key group ranges which prevents local state from being used. However in case of a recovery the above-mentioned log message should indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal <[hidden email]> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether there is a way to validate that some tasks of the job recovered from the local state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we run a job with parallelism 4. To simulate failure, we kill one of the Task Manager pods (we run on Kubernetes). I want to see if the local state of the other Task Manager was used or not. I do understand that the state for the killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a better way?

Thanks,
Sonam
--
- Dhanesh ( sent from my mobile device. Pardon me for any typos )