Failures due to inevitable high backpressure

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Failures due to inevitable high backpressure

Hubert
Hello, 

My Flink application has entered into a bad state and I was wondering if I could get some advice on how to resolve the issue. 

The sequence of events that led to a bad state: 

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This introduces high backpressure. 
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka transaction timeout) and the application loops back to step #2. This creates a vicious cycle where no progress is made. 

I believe the underlying issue is the application experiencing high backpressure. This can cause the TM to not respond to heartbeats or cause long checkpoint durations due to delayed processing of the checkpoint. 

I'm confused on the best next steps to take. How do I ensure that heartbeats and checkpoints successfully complete when experiencing inevitable high packpressure?

Best,
Hubert
Reply | Threaded
Open this post in threaded view
|

Re: Failures due to inevitable high backpressure

David Anderson-3
You should begin by trying to identify the cause of the backpressure, because the appropriate fix depends on the details. 

Possible causes that I have seen include:

- the job is inadequately provisioned
- blocking i/o is being done in a user function
- a huge number of timers are firing simultaneously
- event time skew between different sources is causing large amounts of state to be buffered
- data skew (a hot key) is overwhelming one subtask or slot
- external systems can't keep up (e.g., a sink)
- lengthy GC pauses caused by running lots of slots per TM with the FsStateBackend
- contention for critical resources (e.g., using a NAS as the local disk for RocksDB)

Unaligned checkpoints [1], new in Flink 1.11, should address this problem in some cases, depending on the root cause. But first you should try to figure out why you have high backpressure, because a number of the causes listed above won't be helped by changing to unaligned checkpoints.

Best,
David


On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen <[hidden email]> wrote:
Hello, 

My Flink application has entered into a bad state and I was wondering if I could get some advice on how to resolve the issue. 

The sequence of events that led to a bad state: 

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This introduces high backpressure. 
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka transaction timeout) and the application loops back to step #2. This creates a vicious cycle where no progress is made. 

I believe the underlying issue is the application experiencing high backpressure. This can cause the TM to not respond to heartbeats or cause long checkpoint durations due to delayed processing of the checkpoint. 

I'm confused on the best next steps to take. How do I ensure that heartbeats and checkpoints successfully complete when experiencing inevitable high packpressure?

Best,
Hubert
Reply | Threaded
Open this post in threaded view
|

Re: Failures due to inevitable high backpressure

David Anderson-3
One other thought: some users experiencing this have found it preferable to increase the checkpoint timeout to the point where it is effectively infinite. Checkpoints that can't timeout are likely to eventually complete, which is better than landing in the vicious cycle you described.

David

On Wed, Aug 26, 2020 at 7:41 PM David Anderson <[hidden email]> wrote:
You should begin by trying to identify the cause of the backpressure, because the appropriate fix depends on the details. 

Possible causes that I have seen include:

- the job is inadequately provisioned
- blocking i/o is being done in a user function
- a huge number of timers are firing simultaneously
- event time skew between different sources is causing large amounts of state to be buffered
- data skew (a hot key) is overwhelming one subtask or slot
- external systems can't keep up (e.g., a sink)
- lengthy GC pauses caused by running lots of slots per TM with the FsStateBackend
- contention for critical resources (e.g., using a NAS as the local disk for RocksDB)

Unaligned checkpoints [1], new in Flink 1.11, should address this problem in some cases, depending on the root cause. But first you should try to figure out why you have high backpressure, because a number of the causes listed above won't be helped by changing to unaligned checkpoints.

Best,
David


On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen <[hidden email]> wrote:
Hello, 

My Flink application has entered into a bad state and I was wondering if I could get some advice on how to resolve the issue. 

The sequence of events that led to a bad state: 

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This introduces high backpressure. 
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka transaction timeout) and the application loops back to step #2. This creates a vicious cycle where no progress is made. 

I believe the underlying issue is the application experiencing high backpressure. This can cause the TM to not respond to heartbeats or cause long checkpoint durations due to delayed processing of the checkpoint. 

I'm confused on the best next steps to take. How do I ensure that heartbeats and checkpoints successfully complete when experiencing inevitable high packpressure?

Best,
Hubert
Reply | Threaded
Open this post in threaded view
|

Re: Failures due to inevitable high backpressure

Arvid Heise-3
Hi Hubert,

The most straight-forward reason for backpressure is under-provisioning of the cluster. An application over time usually needs gradually more resources. If the user base of your company grows, so does the amount of messages (be it click stream, page impressions, or any kind of transactions). Often time, also the operator state grows. Sometimes, it's just that the events themselves become more complex and thus you need more overall bandwidth. This means that from time to time, you need to increase the memory of Flink (for state) or the number of compute nodes (to handle more events). In the same way, you need to make sure that your sink scales as well.

If you fail to keep up with the demand, the application gradually becomes more unstable and you see the vicious cycle at some point, where the system does not even catch up in off-hours where the number of events becomes small.

First, it's important to understand what the bottleneck is. Web UI should help to narrow it down quickly.
Second, if TM becomes unresponsive, chances are high that memory ran out (on or off-heap). So it might be enough to increase memory. In any case, I'd expect one of the TM logs to show an exception. You could also profile GC time of the TMs.
Third, you also might want to check your state size. If it grows over time, it might also be some kind of leak (also logic errors are common, where too much is held in state and never evicted).
Fourth, closely monitor how the application behaves while recovery. Is it making progress at all or stalling at the same point?
Fifth, it might be worthwhile to add a computation node to the cluster, just so that everything runs again and then remove it. If you now have 2 days of data in need of reprocessing to catch up, even the aforementioned tweaks may not be enough.

Best,

Arvid

On Wed, Aug 26, 2020 at 10:01 PM David Anderson <[hidden email]> wrote:
One other thought: some users experiencing this have found it preferable to increase the checkpoint timeout to the point where it is effectively infinite. Checkpoints that can't timeout are likely to eventually complete, which is better than landing in the vicious cycle you described.

David

On Wed, Aug 26, 2020 at 7:41 PM David Anderson <[hidden email]> wrote:
You should begin by trying to identify the cause of the backpressure, because the appropriate fix depends on the details. 

Possible causes that I have seen include:

- the job is inadequately provisioned
- blocking i/o is being done in a user function
- a huge number of timers are firing simultaneously
- event time skew between different sources is causing large amounts of state to be buffered
- data skew (a hot key) is overwhelming one subtask or slot
- external systems can't keep up (e.g., a sink)
- lengthy GC pauses caused by running lots of slots per TM with the FsStateBackend
- contention for critical resources (e.g., using a NAS as the local disk for RocksDB)

Unaligned checkpoints [1], new in Flink 1.11, should address this problem in some cases, depending on the root cause. But first you should try to figure out why you have high backpressure, because a number of the causes listed above won't be helped by changing to unaligned checkpoints.

Best,
David


On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen <[hidden email]> wrote:
Hello, 

My Flink application has entered into a bad state and I was wondering if I could get some advice on how to resolve the issue. 

The sequence of events that led to a bad state: 

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This introduces high backpressure. 
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka transaction timeout) and the application loops back to step #2. This creates a vicious cycle where no progress is made. 

I believe the underlying issue is the application experiencing high backpressure. This can cause the TM to not respond to heartbeats or cause long checkpoint durations due to delayed processing of the checkpoint. 

I'm confused on the best next steps to take. How do I ensure that heartbeats and checkpoints successfully complete when experiencing inevitable high packpressure?

Best,
Hubert


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng