Hi, How can I tell a “FlinkKafkaConsumer” that I want to read from a topic from the beginning? Thanks, Will |
Hi Will, In Kafka's consumer configuration [1] there is a configuration parameter called "auto.offset.reset". Setting it to "smallest" will tell the consumer to start reading a topic from the smallest available offset. You can pass the configuration using the properties of the Kafka consumer. On Tue, Nov 17, 2015 at 8:55 AM, Miaoyongqiang (Will) <[hidden email]> wrote:
|
You also need to have a new
group.idfor this to work. |
In reply to this post by rmetzger0
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512 I would put this differently: "auto.offset.reset" policy is only used, if there are no valid committed offsets for a topic. See here: http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups - -and-offset-management (don't be confused about "earliest/latest" and "smallest/larges" -- the former is for Kafka 0.8.2 and the later for 0.9+ -- but the mechanism is the same) But I though, Flink does not rely on consumer offsets commits but does "manual" offset management? So I am wondering, if this property is passed into the Kafka source operator's Kafka consumer or not? - -Matthias On 11/17/15 2:30 AM, Robert Metzger wrote: > Hi Will, > > In Kafka's consumer configuration [1] there is a configuration > parameter called "auto.offset.reset". Setting it to "smallest" will > tell the consumer to start reading a topic from the smallest > available offset. > > You can pass the configuration using the properties of the Kafka > consumer. > > > [1] http://kafka.apache.org/documentation.html#consumerconfigs > > > On Tue, Nov 17, 2015 at 8:55 AM, Miaoyongqiang (Will) > <[hidden email] <mailto:[hidden email]>> > wrote: > > Hi,____ > > __ __ > > How can I tell a “FlinkKafkaConsumer” that I want to read from a > topic from the beginning?____ > > __ __ > > Thanks,____ > > Will____ > > Comment: GPGTools - https://gpgtools.org iQIYBAEBCgAGBQJYfSWaAAoJELz8Z8hxAGOilZ0P2gJZzeSpSU5RK7gmrL5oohyA T+mKWXIkdMepDNec6w4zM0V07NnObu0UsVqPWEJmdOHg6bFihxmjO8i+7vYFShDH 9h26pChB7W6nvrwrASRiTXLNQl9rhMrBmp2qsMXskjKCHn+pHGeT0+LIt91sCwL0 VndFzk36UolfleGxpeQkcmPfNeTvlHws7nI5Imv5flsGIvWuGyJr/1v1Z2bWuXYj PxE2vndoQo4yvcgEfSI3kNnm3vKnflPi83SuCY5r+C2lfiz1c83GM/yPPwlcUR5c KjfeDQidy0B9npYkvTqoJV7Fm0oGvWjKKHCoS5HRrk4ha8WrakS/5FNpwf+FaOhi +TCCdi9TAHhYd0lD183HK/F6bbnHTvo75C9PsCjcF7gFWDOj9sBgvTNvz8SgokpQ g+QeiWtfi/YeU1TRWfM/KlpBdr5O/KmPFJ6XxIzXzUQmjR+z+Rp0j/hWq6o4loS5 OlJbtZon08HMcGIC0hQOGlnF2tKMkwEuatA3/fDor9AU2TAmQjhdZGvAu/RIa9IX yKATrFjdxLLk3sUVvowTnnK1kSEApM4g3m3hGdPVzqsIWzbjgsNSvBDPKEma7oFu y3cpo+x7uqE0QkJpDaja2zvYdRu91lwAJIkpDPknE/Ip2x6j+sWPwz3NRTRK7eEN NH65TaPJXQvipDA= =iVUW -----END PGP SIGNATURE----- |
Hi, Yes, Flink does not rely on consumer offset commits in Kafka / Zookeeper. It manages offsets as checkpointed state with Flink, and uses those offsets for exactly-once. Currently the “auto.offset.reset” is passed into the internally used KafkaConsumer as a means to define start position, but there’s work to further “de-couple” this so that respecting existing offsets in Kafka is merely one of the possible ways to define start position for the consumer, and that Flink isn’t depending on them in any way. There’s more detail on this in these JIRAs: Cheers, Gordon On January 16, 2017 at 8:58:21 PM, Matthias J. Sax ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |