Hi,
I am using kafka in couple of programs including flink and I am quite
confused that how group.id parameter work in flink with kafka comsumer. I
have 2 consumers (one in side flink, one outside) running on the same
topic and same group.id. From my inpection, they work isolately: If
I send one message into the kafka topic, both consumer would receive
it. But acording to the kafka document, only one (same topic same group)
should be able to get the message. And
i tried to start 2 consumers both inside the flink main function, it
seems that only one consumer would be receiving all messages from kafka. So, it's quite confused how does flink deal with this kind of situation? Thanks a lot |
Hi, 1. What Flink Kafka connector version are you using? 2. How is your non-Flink consumer fetching data from the topic? Is it using the old SimpleConsumer, old High-Level Consumer, or the new consumer API? 3. If you are using the new consumer API, are you using "consumer.assign(…)" or "consumer.subscribe(…)"? Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the “group.id” setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers. From your description I’m not entirely sure of what’s happening, so it’d be helpful if you can provide info about the above questions so we can clarify things further :) Regards, Gordon On July 29, 2016 at 12:11:19 AM, 王萌 ([hidden email]) wrote:
|
This post was updated on .
Hi Gordon,
I am consuming the messages from kafka with FlinkKafkaConsumer09 and I have also specified the group.id. I have enabled checkpointing, and below configs auto.commit.enable=true auto.offset.reset=earliest. From your post I could understand that group.id is not much useful as far as offset monitoring is concerned. Is there any way that I can get the info on current offset of consumer, current offset of topic and the lag of offset. I tried to understand the accumulators at flink dashboard but unable to understand what do the below accumulator name represents 1. consumer-records-lag-max 2.consumer-records-consumed-rate 3.consumer-request-rate Above seems to be relevant to the info I am looking for. Need help on understanding accumulators. Thanks, Mojes |
From the code in Kafka09Fetcher.java
// if checkpointing is enabled, we are not automatically committing to Kafka. kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(!runtimeContext.isCheckpointingEnabled())); If flink checkpointing is enabled, the auto commit in kafka is disabled, I am still trying to find out how to get a external view from the kafka offset utilities in this scenario. Thanks, Prabhu |
Hi, you can get the offsets (current and committed offsets) in Flink 1.1 using the Flink metrics. In Flink 1.0, we expose the Kafka internal metrics via the accumulator system (so you can access them from the web interface as well). IIRC, Kafka exposes a metric for the lag as well. On Mon, Aug 8, 2016 at 8:15 PM, [hidden email] <[hidden email]> wrote: From the code in Kafka09Fetcher.java |
Free forum by Nabble | Edit this page |