How to get latest offsets with FlinkKafkaConsumer

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to get latest offsets with FlinkKafkaConsumer

Mao, Wei

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I noticed that when I restarted my Flink application, it reads records starting from the latest offset that I consumed last time, but not the latest offsets of that topic in Kafka.  

 

So Is there any way to make it read from last offsets of broker/MyTopic instead of consumer/MyTopic in Flink?  

 

Thanks,

William

 

Reply | Threaded
Open this post in threaded view
|

Re: How to get latest offsets with FlinkKafkaConsumer

Stefan Richter
Hi,

I think passing properties with setProperty("auto.offset.reset", "smallest“) to the Kafka consumer should do what you want.

Best,
Stefan


Am 05.08.2016 um 14:36 schrieb Mao, Wei <[hidden email]>:

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I noticed that when I restarted my Flink application, it reads records starting from the latest offset that I consumed last time, but not the latest offsets of that topic in Kafka.  
 
So Is there any way to make it read from last offsets of broker/MyTopic instead of consumer/MyTopic in Flink?  
 
Thanks,
William

Reply | Threaded
Open this post in threaded view
|

Re: How to get latest offsets with FlinkKafkaConsumer

Stefan Richter
Sorry, I think you are actually asking for the largest offset in the Kafka source, which makes it setProperty("auto.offset.reset", "largest").

Am 05.08.2016 um 14:44 schrieb Stefan Richter <[hidden email]>:

Hi,

I think passing properties with setProperty("auto.offset.reset", "smallest“) to the Kafka consumer should do what you want.

Best,
Stefan


Am 05.08.2016 um 14:36 schrieb Mao, Wei <[hidden email]>:

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I noticed that when I restarted my Flink application, it reads records starting from the latest offset that I consumed last time, but not the latest offsets of that topic in Kafka.  
 
So Is there any way to make it read from last offsets of broker/MyTopic instead of consumer/MyTopic in Flink?  
 
Thanks,
William


Reply | Threaded
Open this post in threaded view
|

Re: How to get latest offsets with FlinkKafkaConsumer

Tzu-Li Tai
Hi,

Please also note that the “auto.offset.reset” property is only respected when there is no offsets under the same consumer group in ZK. So, currently, in order to make sure you read from the latest / earliest offsets every time you restart your Flink application, you’d have to use an unique groupId on each restart.

We’re currently working on new configuration for the Kafka consumer to explicitly configure the starting offset / position without respecting existing offsets in ZK. You can follow the corresponding JIRA here: https://issues.apache.org/jira/browse/FLINK-4280.

Regards,
Gordon

On August 5, 2016 at 8:47:32 PM, Stefan Richter ([hidden email]) wrote:

Sorry, I think you are actually asking for the largest offset in the Kafka source, which makes it setProperty("auto.offset.reset", "largest").

Am 05.08.2016 um 14:44 schrieb Stefan Richter <[hidden email]>:

Hi,

I think passing properties with setProperty("auto.offset.reset", "smallest“) to the Kafka consumer should do what you want.

Best,
Stefan


Am 05.08.2016 um 14:36 schrieb Mao, Wei <[hidden email]>:

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I noticed that when I restarted my Flink application, it reads records starting from the latest offset that I consumed last time, but not the latest offsets of that topic in Kafka.  
 
So Is there any way to make it read from last offsets of broker/MyTopic instead of consumer/MyTopic in Flink?  
 
Thanks,
William


Reply | Threaded
Open this post in threaded view
|

RE: How to get latest offsets with FlinkKafkaConsumer

Mao, Wei

Thank you Stefan and Gordon, It’s really helpful.

 

I will try the “auto.offset.reset” property. And instead of use new consumer group every time, I would like to clean the offset under current consumer group before restarting Flink application, in order to avoid redundant records in ZK.

 

Regards,

William  

 

From: Tzu-Li (Gordon) Tai [mailto:[hidden email]]
Sent: Friday, August 5, 2016 8:57 PM
To: [hidden email]
Subject: Re: How to get latest offsets with FlinkKafkaConsumer

 

Hi,

 

Please also note that the “auto.offset.reset” property is only respected when there is no offsets under the same consumer group in ZK. So, currently, in order to make sure you read from the latest / earliest offsets every time you restart your Flink application, you’d have to use an unique groupId on each restart.

 

We’re currently working on new configuration for the Kafka consumer to explicitly configure the starting offset / position without respecting existing offsets in ZK. You can follow the corresponding JIRA here: https://issues.apache.org/jira/browse/FLINK-4280.

 

Regards,

Gordon

 

On August 5, 2016 at 8:47:32 PM, Stefan Richter ([hidden email]) wrote:

Sorry, I think you are actually asking for the largest offset in the Kafka source, which makes it setProperty("auto.offset.reset", "largest").

 

Am 05.08.2016 um 14:44 schrieb Stefan Richter <[hidden email]>:

 

Hi,

 

I think passing properties with setProperty("auto.offset.reset", "smallest“) to the Kafka consumer should do what you want.

 

Best,

Stefan

 

 

Am 05.08.2016 um 14:36 schrieb Mao, Wei <[hidden email]>:

 

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I noticed that when I restarted my Flink application, it reads records starting from the latest offset that I consumed last time, but not the latest offsets of that topic in Kafka.  

 

So Is there any way to make it read from last offsets of broker/MyTopic instead of consumer/MyTopic in Flink?  

 

Thanks,

William