Throttling/effective back-pressure on a Kafka sink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Throttling/effective back-pressure on a Kafka sink

Marc Rooding
Hi

We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages going to Kafka, causing the following error:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has passed since batch creation

We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka cluster is running version 2.1.1. The Kafka producer uses all default settings except from:

compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips

I tried playing around with the buffer and batch settings, increasing timeouts, but none seem to be what we need. Increasing the delivery.timeout.ms and request.timeout.ms solves the initial error, but causes the Flink job to fail entirely due to:

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

My assumption is that the Kafka producer will start blocking since it notices that it can't handle the batches, and Flink eventually runs out of buffers for the operator.

What really baffles me is that the backpressure tab shows that everything is OK. The entire job pipeline (which reads from 4 different topics, unions them all and sinks towards 1 topic) pushes all the messages through to the sink stage, resulting in 18 million incoming stage messages, even though Kafka is in no way possible to keep up with this.

I searched for others facing the same issue but can't find anything similar. I'm hoping that someone here could guide me in the right direction.

Thanks in advance

Reply | Threaded
Open this post in threaded view
|

Re: Throttling/effective back-pressure on a Kafka sink

Konstantin Knauf-2
Hi Marc,

the Kafka Producer should be able to create backpressure. Could you try to increase max.block.ms to Long.MAX_VALUE?

The exceptions you shared for the failure case don't look like the root causes of the problem. Could you share the full stacktraces or even full logs for this time frame. Feel free to send these logs to me directly, if you don't want to share them on the list.

Best,

Konstantin




On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding <[hidden email]> wrote:
Hi

We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages going to Kafka, causing the following error:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has passed since batch creation

We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka cluster is running version 2.1.1. The Kafka producer uses all default settings except from:

compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips

I tried playing around with the buffer and batch settings, increasing timeouts, but none seem to be what we need. Increasing the delivery.timeout.ms and request.timeout.ms solves the initial error, but causes the Flink job to fail entirely due to:

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

My assumption is that the Kafka producer will start blocking since it notices that it can't handle the batches, and Flink eventually runs out of buffers for the operator.

What really baffles me is that the backpressure tab shows that everything is OK. The entire job pipeline (which reads from 4 different topics, unions them all and sinks towards 1 topic) pushes all the messages through to the sink stage, resulting in 18 million incoming stage messages, even though Kafka is in no way possible to keep up with this.

I searched for others facing the same issue but can't find anything similar. I'm hoping that someone here could guide me in the right direction.

Thanks in advance



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Throttling/effective back-pressure on a Kafka sink

Derek VerLee

Was any progress ever made on this?  We have seen the same issue in the past.  What I do remember is, whatever I set max.block.ms to, is when the job crashes.
I am going to attempt to reproduce the issue again and will report back.


On 3/28/19 3:27 PM, Konstantin Knauf wrote:
Hi Marc,

the Kafka Producer should be able to create backpressure. Could you try to increase max.block.ms to Long.MAX_VALUE?

The exceptions you shared for the failure case don't look like the root causes of the problem. Could you share the full stacktraces or even full logs for this time frame. Feel free to send these logs to me directly, if you don't want to share them on the list.

Best,

Konstantin




On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding <[hidden email]> wrote:
Hi

We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages going to Kafka, causing the following error:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has passed since batch creation

We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka cluster is running version 2.1.1. The Kafka producer uses all default settings except from:

compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips

I tried playing around with the buffer and batch settings, increasing timeouts, but none seem to be what we need. Increasing the delivery.timeout.ms and request.timeout.ms solves the initial error, but causes the Flink job to fail entirely due to:

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

My assumption is that the Kafka producer will start blocking since it notices that it can't handle the batches, and Flink eventually runs out of buffers for the operator.

What really baffles me is that the backpressure tab shows that everything is OK. The entire job pipeline (which reads from 4 different topics, unions them all and sinks towards 1 topic) pushes all the messages through to the sink stage, resulting in 18 million incoming stage messages, even though Kafka is in no way possible to keep up with this.

I searched for others facing the same issue but can't find anything similar. I'm hoping that someone here could guide me in the right direction.

Thanks in advance



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen