Backpressure on Window step

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Backpressure on Window step

Nelson Steven

Hello!

 

We are experiencing occasional backpressure on a Window function in our pipeline. The window is on a KeyedStream and is triggered by an EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout and we use the window to sort things into batches based on the Key for the keyed stream. We aren’t seeing an unreasonable amount of records (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We are as interested in learning out to debug the issue as we are in fixing the actual problem. Any ideas?

 

-Steve

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure on Window step

David Anderson-3
Backpressure is typically caused by something like one of these things:

* problems relating to i/o to external services (e.g., enrichment via an API or database lookup, or a sink)
* data skew (e.g., a hot key)
* under-provisioning, or competition for resources
* spikes in traffic
* timer storms

I would start to debug this by looking for signs of significant asymmetry in the metrics (across the various subtasks), or resource exhaustion. Could be related to the network, GC, CPU, disk i/o, etc. Flink's webUI will show you checkpoint size and timing information for each sub-task; you can learn a lot from studying that data.

Relating to session windows -- could you occasionally have an unusually long session, and might that cause problems?

Best,
David

On Tue, Jul 14, 2020 at 10:12 PM Nelson Steven <[hidden email]> wrote:

Hello!

 

We are experiencing occasional backpressure on a Window function in our pipeline. The window is on a KeyedStream and is triggered by an EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout and we use the window to sort things into batches based on the Key for the keyed stream. We aren’t seeing an unreasonable amount of records (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We are as interested in learning out to debug the issue as we are in fixing the actual problem. Any ideas?

 

-Steve

Reply | Threaded
Open this post in threaded view
|

RE: Backpressure on Window step

Nelson Steven

First off, thanks for your reply!

 

I have an assumption that I should probably verify first:

When determining the source of the backpressure we look (in the WebUI) for the first operator in our pipeline that is not showing backpressure. That’s what we consider to be the source of the backpressure

 

In this case the first operator that in our graph that is not showing backpressure is our window operator (all though the keyBy operation right before it doesn’t show up in the graph). The window function uses a custom aggregation function that builds up a hashmap and a custom process function that emits the hashmap and performs some metrics operations. I am not sure how this would generate backpressure since it doesn’t perform any IO, but again I might be drawing incorrect conclusions.

 

The window function has a parallelism of 32. Each of the Subtasks has between 136kb and 2.45mb of state, with a checkpoint duration of 280ms to 2 seconds. Each of the 32 subtasks appear to be handling 1,700-50,000 records an hour with a bytes received of 7mb and 170mb

 

Am I barking up the wrong tree?

 

-Steve

 

 

From: David Anderson <[hidden email]>
Sent: Friday, July 17, 2020 6:54 AM
To: Nelson Steven <[hidden email]>
Cc: [hidden email]
Subject: Re: Backpressure on Window step

 

Backpressure is typically caused by something like one of these things:

 

* problems relating to i/o to external services (e.g., enrichment via an API or database lookup, or a sink)

* data skew (e.g., a hot key)

* under-provisioning, or competition for resources

* spikes in traffic

* timer storms

 

I would start to debug this by looking for signs of significant asymmetry in the metrics (across the various subtasks), or resource exhaustion. Could be related to the network, GC, CPU, disk i/o, etc. Flink's webUI will show you checkpoint size and timing information for each sub-task; you can learn a lot from studying that data.

 

Relating to session windows -- could you occasionally have an unusually long session, and might that cause problems?

 

Best,
David

 

On Tue, Jul 14, 2020 at 10:12 PM Nelson Steven <[hidden email]> wrote:

Hello!

 

We are experiencing occasional backpressure on a Window function in our pipeline. The window is on a KeyedStream and is triggered by an EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout and we use the window to sort things into batches based on the Key for the keyed stream. We aren’t seeing an unreasonable amount of records (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We are as interested in learning out to debug the issue as we are in fixing the actual problem. Any ideas?

 

-Steve

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure on Window step

David Anderson-3
Steve,

Your approach to debugging this sounds reasonable, but keep in mind that the backpressure detection built into the WebUI is not infallible. You could have backpressure that it doesn't detect.

FWIW, keyBy isn't an operator -- it's a declaration of how the operators before and after the keyBy are connected. 

Have you tried increasing the parallelism of the task(s) before the window (where the parallelism is currently 4)? Given that your job is already using 32 slots, you have little to lose by doing so. Perhaps the keyBy (and associated change in parallelism) is the first point in your job where the events are being serialized and sent over the network, and maybe 4 instances aren't enough to consistently provide the required throughput.

David

On Fri, Jul 17, 2020 at 11:17 PM Nelson Steven <[hidden email]> wrote:

First off, thanks for your reply!

 

I have an assumption that I should probably verify first:

When determining the source of the backpressure we look (in the WebUI) for the first operator in our pipeline that is not showing backpressure. That’s what we consider to be the source of the backpressure

 

In this case the first operator that in our graph that is not showing backpressure is our window operator (all though the keyBy operation right before it doesn’t show up in the graph). The window function uses a custom aggregation function that builds up a hashmap and a custom process function that emits the hashmap and performs some metrics operations. I am not sure how this would generate backpressure since it doesn’t perform any IO, but again I might be drawing incorrect conclusions.

 

The window function has a parallelism of 32. Each of the Subtasks has between 136kb and 2.45mb of state, with a checkpoint duration of 280ms to 2 seconds. Each of the 32 subtasks appear to be handling 1,700-50,000 records an hour with a bytes received of 7mb and 170mb

 

Am I barking up the wrong tree?

 

-Steve

 

 

From: David Anderson <[hidden email]>
Sent: Friday, July 17, 2020 6:54 AM
To: Nelson Steven <[hidden email]>
Cc: [hidden email]
Subject: Re: Backpressure on Window step

 

Backpressure is typically caused by something like one of these things:

 

* problems relating to i/o to external services (e.g., enrichment via an API or database lookup, or a sink)

* data skew (e.g., a hot key)

* under-provisioning, or competition for resources

* spikes in traffic

* timer storms

 

I would start to debug this by looking for signs of significant asymmetry in the metrics (across the various subtasks), or resource exhaustion. Could be related to the network, GC, CPU, disk i/o, etc. Flink's webUI will show you checkpoint size and timing information for each sub-task; you can learn a lot from studying that data.

 

Relating to session windows -- could you occasionally have an unusually long session, and might that cause problems?

 

Best,
David

 

On Tue, Jul 14, 2020 at 10:12 PM Nelson Steven <[hidden email]> wrote:

Hello!

 

We are experiencing occasional backpressure on a Window function in our pipeline. The window is on a KeyedStream and is triggered by an EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout and we use the window to sort things into batches based on the Key for the keyed stream. We aren’t seeing an unreasonable amount of records (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We are as interested in learning out to debug the issue as we are in fixing the actual problem. Any ideas?

 

-Steve