FlinkKafkaConsumer problem

classic Classic list List threaded Threaded
2 messages Options
op
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaConsumer problem

op
    hi,
    i am confused about consumer group of FlinkKafkaConsumer, 
    i have two applications,with the same code like this:
//---------------------------
 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
Env.setRestartStrategy(RestartStrategies.noRestart())
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", brokers)
consumerProps.put("group.id", "test1234")

val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
Env.addSource(consumer).print()
Env.execute()
//-----------------------------------
then i launch both,they have the same topic and  group.id,and when i send some message to the topic,
i find both application consume all the data ,which does‘t behave as kafka consumer group,
can someone tell me why?
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer problem

Till Rohrmann
The reason two Flink jobs using a Kafka consumer with the same consumer group are seeing the same events is that Flink's FlinkKafkaConsumer does not participate in Kafka's consumer group management. Instead Flink manually assigns all partitions to the source operators (on a per job basis). The consumer group will only be used to commit the current offset to the Kafka brokers.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:42 AM op <[hidden email]> wrote:
    hi,
    i am confused about consumer group of FlinkKafkaConsumer, 
    i have two applications,with the same code like this:
//---------------------------
 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
Env.setRestartStrategy(RestartStrategies.noRestart())
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", brokers)
consumerProps.put("group.id", "test1234")

val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
Env.addSource(consumer).print()
Env.execute()
//-----------------------------------
then i launch both,they have the same topic and  group.id,and when i send some message to the topic,
i find both application consume all the data ,which does‘t behave as kafka consumer group,
can someone tell me why?