Hi, We have a Flink stream job that uses Flink kafka consumer. Normally it commits consumer offsets to Kafka. However this stream ended up in a state where it's otherwise working just fine, but it isn't committing offsets to Kafka any more. The job keeps writing correct aggregation results to the sink, though. At the time of writing this, the job has been running 14 hours without committing offsets. Below is an extract from taskmanager.log. As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends, these are the last lines so far. Could you help check if this is a know bug, possibly already fixed, or something new? I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 8395508b0401353ed07375e22882e7581d46ac0e which is not super old. Cheers, Juho 2018-06-06 10:01:33,498 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 2018-06-06 10:01:33,498 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799 2018-06-06 10:01:33,560 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000. 2018-06-06 10:01:33,563 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) for group aggregate-all_server_measurements_combined-20180606-1000. 2018-06-07 22:08:28,773 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000 2018-06-07 22:08:28,776 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets. 2018-06-07 22:08:29,840 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000 2018-06-07 22:08:29,841 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for group aggregate-all_server_measurements_combined-20180606-1000: Offset commit failed with a retriable exception. You should retry committing offsets. |
Hi,
What’s your KafkaConsumer configuration? Especially values for: - is checkpointing enabled? - enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms - did you set setCommitOffsetsOnCheckpoints() ? Please also refer to https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration , especially this part: > Note that the Flink Kafka Consumer does not rely on the committed offsets for fault tolerance guarantees. The committed offsets are only a means to expose the consumer’s progress for monitoring purposes. Can you post full logs from all TaskManagers/JobManager and can you say/estimate when did the committing brake/stop? Did you check Kafka logs for any errors? To me it seems more like a Kafka issue/bug: Especially that in your case this offsets committing is superseded by Kafka coordinator failure. Piotrek
|
Hi Piotr, thanks for your insights. > What’s your KafkaConsumer configuration? We only set these in the properties that are passed to FlinkKafkaConsumer010 constructor: auto.offset.reset=latest bootstrap.servers=my-kafka-hos group.id=my_group flink.partition-discovery.inte > is checkpointing enabled? No. > enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms We have whatever is the default behaviour of Flink kafka consumer. It seems to commit quite often, something like every 5 seconds. > did you set setCommitOffsetsOnCheckpoints( No. But I checked with debugger that apparently enableCommitOnCheck I also checked with debugger that offsetCommitMode=KAFKA_PE So I guess you're right that this bug doesn't seem to be in Flink itself? I wonder if it's a known issue in Kafka client lib.. I also took thread dump on one of the task managers in this broken state. But I couldn't spot anything obvious when comparing the threads to a dump from a job where offsets are being committed. Any way I've saved the thread dump in case there's something to look for specifically. Sharing the full logs of job & task managers would be a bit of a hassle, because I don't have an automatic way to obfuscate the logs so that I'm sure that there isn't anything sensitive left. Any way, there isn't anything else to share really. I wrote: "As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends". Thanks once more. On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <[hidden email]> wrote:
|
The more I look into it, the more it seems like a Kafka bug or some cluster failure from which your Kafka cluster did not recover.
In your cases auto committing should be set to true and in that case KafkaConsumer should commit offsets once every so often when it’s polling messages. Unless for example `cordinatorUnknown()` returns false in `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync` (Kafka 0.10.2.1 code base):
private void maybeAutoCommitOffsetsAsync(long now) { if (autoCommitEnabled) { if (coordinatorUnknown()) { this.nextAutoCommitDeadline = now + retryBackoffMs; } else if (now >= nextAutoCommitDeadline) { this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync(); } } } Have you checked Kafka logs? This suggests that the real problem is hidden behind: > INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 rack: null) dead for group aggregate-all_server_measurements_combined-20180606-1000 And maybe your Kafka cluster/consumer can not recover from this situation. Another thing to try (simpler) is to just trying upgrading Kafka cluster. Piotrek
|
Probably your kafka consumer is rebalancing. This can be due to a bigger message processing time due to which kafka broker is marking your consumer dead and rebalancing. This all happens before the consumer can commit the offsets. On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski <[hidden email]> wrote:
|
Hi,
Thanks for your analysis. We found LeaderElectionRateAndTimeMs go to non-zero value on Kafka around the same time when this error was seen in the Flink job. Kafka itself recovers from this and so do any other consumers that we have. It seems like a bug in kafka consumer library if this error causes it to stop committing offsets. If you have any further insight to this, please let me know. Apart from that, leader election doesn't happen in normal situation. But it can happen for example if there are connectivity problems between the Kafka nodes. On Mon, Jun 11, 2018 at 6:41 PM amit pal <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |