Flink kafka group question

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink kafka group question

王萌
Hi, I am using kafka in couple of programs including flink and I am quite confused that how group.id parameter work in flink with kafka comsumer.
I have 2 consumers (one in side flink, one outside) running on the same topic and same group.id. From my inpection, they work isolately:
If I send one message into the kafka topic, both consumer would receive it. But acording to the kafka document, only one (same topic same group) should be able to get the message.

And i tried to start 2 consumers both inside the flink main function, it seems that only one consumer would be receiving all messages from kafka.

So, it's quite confused how does flink deal with this kind of situation?

Thanks a lot

Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka group question

Tzu-Li Tai
Hi,

1. What Flink Kafka connector version are you using?
2. How is your non-Flink consumer fetching data from the topic? Is it using the old SimpleConsumer, old High-Level Consumer, or the new consumer API?
3. If you are using the new consumer API, are you using "consumer.assign(…)" or "consumer.subscribe(…)"?

Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the “group.id” setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers.

From your description I’m not entirely sure of what’s happening, so it’d be helpful if you can provide info about the above questions so we can clarify things further :)

Regards,
Gordon


On July 29, 2016 at 12:11:19 AM, 王萌 ([hidden email]) wrote:

Hi, I am using kafka in couple of programs including flink and I am quite confused that how group.id parameter work in flink with kafka comsumer.
I have 2 consumers (one in side flink, one outside) running on the same topic and same group.id. From my inpection, they work isolately:
If I send one message into the kafka topic, both consumer would receive it. But acording to the kafka document, only one (same topic same group) should be able to get the message.

And i tried to start 2 consumers both inside the flink main function, it seems that only one consumer would be receiving all messages from kafka.

So, it's quite confused how does flink deal with this kind of situation?

Thanks a lot

Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka group question

Mojes
This post was updated on .
Hi Gordon,

I am consuming the messages from kafka with FlinkKafkaConsumer09 and I have also specified the group.id.
I have enabled checkpointing, and below configs
auto.commit.enable=true
auto.offset.reset=earliest.


From your post I could understand that group.id is not much useful as far as offset monitoring is concerned.

Is there any way that I can get the info on
current offset of consumer,
current offset of topic and the lag of offset.

I tried to understand the accumulators at flink dashboard but unable to understand
what do the below accumulator name represents
1. consumer-records-lag-max
2.consumer-records-consumed-rate
3.consumer-request-rate

Above seems to be relevant to the info I am looking for.

Need help on understanding accumulators.

Thanks,
Mojes

Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka group question

vprabhu@gmail.com
From the code  in Kafka09Fetcher.java

        // if checkpointing is enabled, we are not automatically committing to Kafka.
                kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
Boolean.toString(!runtimeContext.isCheckpointingEnabled()));


If flink checkpointing is enabled, the auto commit in kafka is disabled, I am still trying to find out how to get a external view from the kafka offset utilities in this scenario.

Thanks,
Prabhu
Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka group question

rmetzger0
Hi,
you can get the offsets (current and committed offsets) in Flink 1.1 using the Flink metrics.
In Flink 1.0, we expose the Kafka internal metrics via the accumulator system (so you can access them from the web interface as well). IIRC, Kafka exposes a metric for the lag as well.

On Mon, Aug 8, 2016 at 8:15 PM, [hidden email] <[hidden email]> wrote:
From the code  in Kafka09Fetcher.java

        // if checkpointing is enabled, we are not automatically committing to
Kafka.
                kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
Boolean.toString(!runtimeContext.isCheckpointingEnabled()));


If flink checkpointing is enabled, the auto commit in kafka is disabled, I
am still trying to find out how to get a external view from the kafka offset
utilities in this scenario.

Thanks,
Prabhu



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-tp8185p8374.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.