This post was updated on .
Hello,
Since the last update to the universal Kafka connector, I'm getting the following error fairly often. 2019-11-18 15:42:52,689 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-10, groupId=srx-consumer-group] Offset commit failed on partition events-4 at offset 38173628004: The request timed out. &unknown 2019-11-18 15:42:52,707 WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing offsets to Kafka failed. This does not compromise Flink's checkpoints. &unknown 2019-11-18 15:42:52,707 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed. red 2019-11-18 15:42:52,689 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-10, groupId=srx-consumer-group] Offset commit failed on partition events-4 at offset 38173628004: The request timed out. yellow 2019-11-18 15:42:52,707 WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing offsets to Kafka failed. This does not compromise Flink's checkpoints. yellow 2019-11-18 15:42:52,707 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed. --- I've been investigating and I can't find what is causing this. We've been monitoring several metrics like the recordsConsumed, fetch-size-avg and fetch-rate but the values are the same when the error happens and when it doesn't. So we know there isn't a peak of events or a larger fetched size when the problem occurs. We also monitor other metrics like CPU, Memory, GCs, Network IO, Network connections and Disk IO but we haven't found anything out of the ordinary. Our job has two source nodes reading from two distinct kafka topics, the problem happens on both source nodes. Flink Version: 1.8.2 Kafka Version: 2.3.0 My kafka consumer Properties: 2019-11-14 16:51:20,142 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [hidden] check.crcs = true client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = srx-consumer-group heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = /etc/pki/java/ca.ks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = /etc/pki/java/ca.ks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer Commit latency vs commits failed: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-11-26_at_09.png> We have checkpointing enabled (synchronous checkpoints every 10 secs). We also have the job configured to commit kafka offsets on each checkpoint (default behaviour). ----- Best Regards, Pedro Chaves -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves |
Free forum by Nabble | Edit this page |