Replays message in Kafka topics with FlinkKafkaConsumer09

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Replays message in Kafka topics with FlinkKafkaConsumer09

Jack Huang
Hi all,

I am trying to force my job to reprocess old messages in my Kafka topics but couldn't get it to work. Here is my FlinkKafkaConsumer09 setup:

val kafkaProp = new Properties()
kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
kafkaProp.setProperty("auto.offset.reset", "earliest")

env.addSource(new FlinkKafkaConsumer09[String](input, new SimpleStringSchema, kafkaProp))
    .print

I thought auto.offset.reset is going to do the trick. What am I missing here?


Thanks,

Jack Huang
Reply | Threaded
Open this post in threaded view
|

Re: Replays message in Kafka topics with FlinkKafkaConsumer09

Aljoscha Krettek
Hi,
I think the "auto.offset.reset" parameter is only used if your consumer never read from a topic. To simulate being a new consumer you can set "group.id" property to a new random value.

Cheers,
Aljoscha



On Fri, 22 Apr 2016 at 03:10 Jack Huang <[hidden email]> wrote:
Hi all,

I am trying to force my job to reprocess old messages in my Kafka topics but couldn't get it to work. Here is my FlinkKafkaConsumer09 setup:

val kafkaProp = new Properties()
kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
kafkaProp.setProperty("auto.offset.reset", "earliest")

env.addSource(new FlinkKafkaConsumer09[String](input, new SimpleStringSchema, kafkaProp))
    .print

I thought auto.offset.reset is going to do the trick. What am I missing here?


Thanks,

Jack Huang