Failure detection in Flink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Failure detection in Flink

Sonam Mandal
Hello,

I'm looking for some resources around failure detection in Flink between the various components such as Task Manager, Job Manager, Resource Manager, etc. For example, how does the Job Manager detect that a Task Manager is down (long GC pause or it just crashed)?

There is some indication of the use of heartbeats, is this via Akka death watches or custom heartbeat implementation? Reason I ask is because some configurations for timeout are AKKA related, whereas others aren't. I would like to understand which timeouts are relevant to which pieces.

e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
I see some earlier posts that mention akka.watch.heartbeat.interval, though this is not present on the latest configuration page for Flink.

Also, is this failure detection mechanism the same irrespective of the deployment model, i.e. Kubernetes/Yarn/Mesos?

Thanks,
Sonam


Reply | Threaded
Open this post in threaded view
|

Re: Failure detection in Flink

Till Rohrmann
Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. This mechanism is independent of the used deployment model. The relevant configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka system. Since we are using TCP Akka's failure detector is not needed [2]. I think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism because in some deployment scenarios we have additional signals available which could help us with the detection. But so far we haven't made a lot of progress in this area.


Cheers,
Till

On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal <[hidden email]> wrote:
Hello,

I'm looking for some resources around failure detection in Flink between the various components such as Task Manager, Job Manager, Resource Manager, etc. For example, how does the Job Manager detect that a Task Manager is down (long GC pause or it just crashed)?

There is some indication of the use of heartbeats, is this via Akka death watches or custom heartbeat implementation? Reason I ask is because some configurations for timeout are AKKA related, whereas others aren't. I would like to understand which timeouts are relevant to which pieces.

e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
I see some earlier posts that mention akka.watch.heartbeat.interval, though this is not present on the latest configuration page for Flink.

Also, is this failure detection mechanism the same irrespective of the deployment model, i.e. Kubernetes/Yarn/Mesos?

Thanks,
Sonam


Reply | Threaded
Open this post in threaded view
|

Re: Failure detection in Flink

Sonam Mandal
Hi Till,

Thanks, this helps! Yes, removing the AKKA related configs will definitely help to reduce confusion.

One more question, I was going through FLIP-6 and it does talk about the behavior of various components when failures are detected via heartbeat timeouts etc. is this the best reference on how Flink reacts to such failure scenarios? If not, can you provide some details on how this works?

Thanks,
Sonam 


From: Till Rohrmann <[hidden email]>
Sent: Tuesday, March 30, 2021 5:02:43 AM
To: Sonam Mandal <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Failure detection in Flink
 
Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. This mechanism is independent of the used deployment model. The relevant configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka system. Since we are using TCP Akka's failure detector is not needed [2]. I think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism because in some deployment scenarios we have additional signals available which could help us with the detection. But so far we haven't made a lot of progress in this area.


Cheers,
Till

On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal <[hidden email]> wrote:
Hello,

I'm looking for some resources around failure detection in Flink between the various components such as Task Manager, Job Manager, Resource Manager, etc. For example, how does the Job Manager detect that a Task Manager is down (long GC pause or it just crashed)?

There is some indication of the use of heartbeats, is this via Akka death watches or custom heartbeat implementation? Reason I ask is because some configurations for timeout are AKKA related, whereas others aren't. I would like to understand which timeouts are relevant to which pieces.

e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
I see some earlier posts that mention akka.watch.heartbeat.interval, though this is not present on the latest configuration page for Flink.

Also, is this failure detection mechanism the same irrespective of the deployment model, i.e. Kubernetes/Yarn/Mesos?

Thanks,
Sonam


Reply | Threaded
Open this post in threaded view
|

Re: Failure detection in Flink

Till Rohrmann
Well, the FLIP-6 documentation is probably the best resource albeit being a bit outdated.

The components react a bit differently:

JobMaster loses heartbeat with a TaskExecutor: If this happens, then the JobMaster will invalidate all slots from this TaskExecutor. This will then fail the tasks which have been deployed into these slots. This will then trigger a recovery of the affected pipelined region.

TaskExecutor loses heartbeat with a JobMaster: The TaskExecutor will fail all currently running tasks belonging to the timed out JobMaster. Moreover, it will release all intermediate result partitions it still keeps for this job. The slots for this JobMaster will transition to an inactive state. In this state, the TaskExecutor will try to reconnect to the JobMaster in order to offer ths slots. If this is not successful within a configurable timeout, these slots will be freed and returned to the ResourceManager.

JobMaster loses heartbeat with the ResourceManager: The JobMaster tries to reconnect to the ResourceManager. Until this has happened, the JobMaster cannot ask for new slots.

ResourceManager loses heartbeat with the JobMaster: The ResourceManager closes the connection to the JobMaster. Moreover, it registers a timeout until when a JobMaster needs to reconnect to it. If this does not happen, then the ResourceManager will clear the declared resources for the job and cleans up the internal bookkeeping data structures.

I hope this helps a bit to better understand the failover behavior.

If you want to know something in particular, then let me know.

Cheers,
Till

On Tue, Mar 30, 2021 at 4:13 PM Sonam Mandal <[hidden email]> wrote:
Hi Till,

Thanks, this helps! Yes, removing the AKKA related configs will definitely help to reduce confusion.

One more question, I was going through FLIP-6 and it does talk about the behavior of various components when failures are detected via heartbeat timeouts etc. is this the best reference on how Flink reacts to such failure scenarios? If not, can you provide some details on how this works?

Thanks,
Sonam 


From: Till Rohrmann <[hidden email]>
Sent: Tuesday, March 30, 2021 5:02:43 AM
To: Sonam Mandal <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Failure detection in Flink
 
Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. This mechanism is independent of the used deployment model. The relevant configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka system. Since we are using TCP Akka's failure detector is not needed [2]. I think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism because in some deployment scenarios we have additional signals available which could help us with the detection. But so far we haven't made a lot of progress in this area.


Cheers,
Till

On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal <[hidden email]> wrote:
Hello,

I'm looking for some resources around failure detection in Flink between the various components such as Task Manager, Job Manager, Resource Manager, etc. For example, how does the Job Manager detect that a Task Manager is down (long GC pause or it just crashed)?

There is some indication of the use of heartbeats, is this via Akka death watches or custom heartbeat implementation? Reason I ask is because some configurations for timeout are AKKA related, whereas others aren't. I would like to understand which timeouts are relevant to which pieces.

e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
I see some earlier posts that mention akka.watch.heartbeat.interval, though this is not present on the latest configuration page for Flink.

Also, is this failure detection mechanism the same irrespective of the deployment model, i.e. Kubernetes/Yarn/Mesos?

Thanks,
Sonam


Reply | Threaded
Open this post in threaded view
|

Re: Failure detection in Flink

Sonam Mandal
Hi Till,

This is really helpful, thanks for the detailed explanation about what happens. 
I'll reach out again if Ihave any further questions. For now I'm just trying to understand the various failure scenarios and how they are handled by Flink.

Thanks,
Sonam

From: Till Rohrmann <[hidden email]>
Sent: Tuesday, March 30, 2021 8:33 AM
To: Sonam Mandal <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Failure detection in Flink
 
Well, the FLIP-6 documentation is probably the best resource albeit being a bit outdated.

The components react a bit differently:

JobMaster loses heartbeat with a TaskExecutor: If this happens, then the JobMaster will invalidate all slots from this TaskExecutor. This will then fail the tasks which have been deployed into these slots. This will then trigger a recovery of the affected pipelined region.

TaskExecutor loses heartbeat with a JobMaster: The TaskExecutor will fail all currently running tasks belonging to the timed out JobMaster. Moreover, it will release all intermediate result partitions it still keeps for this job. The slots for this JobMaster will transition to an inactive state. In this state, the TaskExecutor will try to reconnect to the JobMaster in order to offer ths slots. If this is not successful within a configurable timeout, these slots will be freed and returned to the ResourceManager.

JobMaster loses heartbeat with the ResourceManager: The JobMaster tries to reconnect to the ResourceManager. Until this has happened, the JobMaster cannot ask for new slots.

ResourceManager loses heartbeat with the JobMaster: The ResourceManager closes the connection to the JobMaster. Moreover, it registers a timeout until when a JobMaster needs to reconnect to it. If this does not happen, then the ResourceManager will clear the declared resources for the job and cleans up the internal bookkeeping data structures.

I hope this helps a bit to better understand the failover behavior.

If you want to know something in particular, then let me know.

Cheers,
Till

On Tue, Mar 30, 2021 at 4:13 PM Sonam Mandal <[hidden email]> wrote:
Hi Till,

Thanks, this helps! Yes, removing the AKKA related configs will definitely help to reduce confusion.

One more question, I was going through FLIP-6 and it does talk about the behavior of various components when failures are detected via heartbeat timeouts etc. is this the best reference on how Flink reacts to such failure scenarios? If not, can you provide some details on how this works?

Thanks,
Sonam 


From: Till Rohrmann <[hidden email]>
Sent: Tuesday, March 30, 2021 5:02:43 AM
To: Sonam Mandal <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Failure detection in Flink
 
Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. This mechanism is independent of the used deployment model. The relevant configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka system. Since we are using TCP Akka's failure detector is not needed [2]. I think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism because in some deployment scenarios we have additional signals available which could help us with the detection. But so far we haven't made a lot of progress in this area.


Cheers,
Till

On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal <[hidden email]> wrote:
Hello,

I'm looking for some resources around failure detection in Flink between the various components such as Task Manager, Job Manager, Resource Manager, etc. For example, how does the Job Manager detect that a Task Manager is down (long GC pause or it just crashed)?

There is some indication of the use of heartbeats, is this via Akka death watches or custom heartbeat implementation? Reason I ask is because some configurations for timeout are AKKA related, whereas others aren't. I would like to understand which timeouts are relevant to which pieces.

e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
I see some earlier posts that mention akka.watch.heartbeat.interval, though this is not present on the latest configuration page for Flink.

Also, is this failure detection mechanism the same irrespective of the deployment model, i.e. Kubernetes/Yarn/Mesos?

Thanks,
Sonam