Kafka Offset commit failed on partition

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Kafka Offset commit failed on partition

PedroMrChaves
This post was updated on .
Hello,

Since the last update to the universal Kafka connector, I'm getting the
following error fairly often.

2019-11-18 15:42:52,689 ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=consumer-10, groupId=srx-consumer-group] Offset commit failed on
partition events-4 at offset 38173628004: The request timed out.
&unknown 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher -
Committing offsets to Kafka failed. This does not compromise Flink's
checkpoints.
&unknown 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async
Kafka commit failed.
red 2019-11-18 15:42:52,689 ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=consumer-10, groupId=srx-consumer-group] Offset commit failed on
partition events-4 at offset 38173628004: The request timed out.
yellow 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher -
Committing offsets to Kafka failed. This does not compromise Flink's
checkpoints.
yellow 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async
Kafka commit failed.


---


I've been investigating and I can't find what is causing this. We've been
monitoring several metrics like the recordsConsumed, fetch-size-avg and
fetch-rate but the values are the same when the error happens and when it
doesn't. So we know there isn't a peak of events or a larger fetched size
when the problem occurs. We also monitor other metrics like CPU, Memory,
GCs, Network IO, Network connections and Disk IO but we haven't found
anything out of the ordinary.  

Our job has two source nodes reading from two distinct kafka topics, the
problem happens on both source nodes.

Flink Version: 1.8.2
Kafka Version: 2.3.0

My kafka consumer Properties:

2019-11-14 16:51:20,142 INFO
org.apache.kafka.clients.consumer.ConsumerConfig              -
ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [hidden]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = srx-consumer-group
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = SSL
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = /etc/pki/java/ca.ks
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = /etc/pki/java/ca.ks
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer


Commit latency vs commits failed:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-11-26_at_09.png

We have checkpointing enabled (synchronous checkpoints every 10 secs). We
also have the job configured to commit kafka offsets on each checkpoint
(default behaviour).



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves