Flink Kafka consumer auto-commit timeout

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka consumer auto-commit timeout

Rong Rong
Hi All,

I would like to bring back this discussion which I saw multiple times in previous ML threads [1], but there seem to have no solution if checkpointing is disabled.

All of these ML reported exceptions have one common pattern:
INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator kafka[xxx]:9092 (id: ??? rack: ???) dead for group consumergroup[xxx]
WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -Auto-commit of offsets {topic[xxx]=OffsetAndMetadata{offset=???,metadata=???''}} failed for group consumergroup[xxx]: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: The request timed out.
In most of the cases enabling OffsetCommitMode.ON_CHECKPOINTS fixes the issue - Flink Kafka consumer will explicitly commit offset when checkpointing and that goes down completely different code path comparing with enabling Kafka consumer option enable.auto.commit and let Kafka consumer handles it.

That brings me to the question:
- Is this feature (disabling checkpoint and restarting job from Kafka committed GROUP_OFFSET) not supported?
- How does Flink-Kafka actually handles auto-commit to coordinator given the fact that Flink ignores the coordinator assignments and uses self-assigning partitions instead?


A bit of our observation:
We had conducted some experiments when option enable.auto.commit is set to true, with Kafka 011 on both Flink 1.4 and 1.6, and observed that the behavior is extremely weird after the above warning were seen:
- the task manager metrics *.Source.KafkaConsumer.current-offsets.topic-[xxx]-partition-[yyy] is moving forward, tracking the latest Kafka broker offset - indicating that the message consumption thread is executing without any issue.
- the task manager metrics *.Source.KafkaConsumer.committed-offsets.topic-[xxx]-partition-[yyy] is stuck indefinitely - indicating that it has stopped talking to the coordinator.

We would try to experiment this with Flink 1.10 later, but has anyone experiencing similar issues with later Flink releases as well?

Thanks,
Rong

--
[1]