How to read from a Kafka topic from the beginning

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to read from a Kafka topic from the beginning

Miaoyongqiang (Will)

Hi,

 

How can I tell a “FlinkKafkaConsumer” that I want to read from a topic from the beginning?

 

Thanks,

Will

Reply | Threaded
Open this post in threaded view
|

Re: How to read from a Kafka topic from the beginning

rmetzger0
Hi Will,

In Kafka's consumer configuration [1] there is a configuration parameter called "auto.offset.reset". Setting it to "smallest" will tell the consumer to start reading a topic from the smallest available offset.

You can pass the configuration using the properties of the Kafka consumer.




On Tue, Nov 17, 2015 at 8:55 AM, Miaoyongqiang (Will) <[hidden email]> wrote:

Hi,

 

How can I tell a “FlinkKafkaConsumer” that I want to read from a topic from the beginning?

 

Thanks,

Will


Reply | Threaded
Open this post in threaded view
|

Re: How to read from a Kafka topic from the beginning

Jonas Gröger
You also need to have a new
group.id
 for this to work.
Reply | Threaded
Open this post in threaded view
|

Re: How to read from a Kafka topic from the beginning

Matthias J. Sax-2
In reply to this post by rmetzger0
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

I would put this differently: "auto.offset.reset" policy is only used,
if there are no valid committed offsets for a topic.

See here:
http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups
- -and-offset-management

(don't be confused about "earliest/latest" and "smallest/larges" --
the former is for Kafka 0.8.2 and the later for 0.9+ -- but the
mechanism is the same)

But I though, Flink does not rely on consumer offsets commits but does
"manual" offset management? So I am wondering, if this property is
passed into the Kafka source operator's Kafka consumer or not?


- -Matthias


On 11/17/15 2:30 AM, Robert Metzger wrote:

> Hi Will,
>
> In Kafka's consumer configuration [1] there is a configuration
> parameter called "auto.offset.reset". Setting it to "smallest" will
> tell the consumer to start reading a topic from the smallest
> available offset.
>
> You can pass the configuration using the properties of the Kafka
> consumer.
>
>
> [1] http://kafka.apache.org/documentation.html#consumerconfigs
>
>
> On Tue, Nov 17, 2015 at 8:55 AM, Miaoyongqiang (Will)
> <[hidden email] <mailto:[hidden email]>>
> wrote:
>
> Hi,____
>
> __ __
>
> How can I tell a “FlinkKafkaConsumer” that I want to read from a
> topic from the beginning?____
>
> __ __
>
> Thanks,____
>
> Will____
>
>
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIYBAEBCgAGBQJYfSWaAAoJELz8Z8hxAGOilZ0P2gJZzeSpSU5RK7gmrL5oohyA
T+mKWXIkdMepDNec6w4zM0V07NnObu0UsVqPWEJmdOHg6bFihxmjO8i+7vYFShDH
9h26pChB7W6nvrwrASRiTXLNQl9rhMrBmp2qsMXskjKCHn+pHGeT0+LIt91sCwL0
VndFzk36UolfleGxpeQkcmPfNeTvlHws7nI5Imv5flsGIvWuGyJr/1v1Z2bWuXYj
PxE2vndoQo4yvcgEfSI3kNnm3vKnflPi83SuCY5r+C2lfiz1c83GM/yPPwlcUR5c
KjfeDQidy0B9npYkvTqoJV7Fm0oGvWjKKHCoS5HRrk4ha8WrakS/5FNpwf+FaOhi
+TCCdi9TAHhYd0lD183HK/F6bbnHTvo75C9PsCjcF7gFWDOj9sBgvTNvz8SgokpQ
g+QeiWtfi/YeU1TRWfM/KlpBdr5O/KmPFJ6XxIzXzUQmjR+z+Rp0j/hWq6o4loS5
OlJbtZon08HMcGIC0hQOGlnF2tKMkwEuatA3/fDor9AU2TAmQjhdZGvAu/RIa9IX
yKATrFjdxLLk3sUVvowTnnK1kSEApM4g3m3hGdPVzqsIWzbjgsNSvBDPKEma7oFu
y3cpo+x7uqE0QkJpDaja2zvYdRu91lwAJIkpDPknE/Ip2x6j+sWPwz3NRTRK7eEN
NH65TaPJXQvipDA=
=iVUW
-----END PGP SIGNATURE-----
Reply | Threaded
Open this post in threaded view
|

Re: How to read from a Kafka topic from the beginning

Tzu-Li (Gordon) Tai
Hi,

Yes, Flink does not rely on consumer offset commits in Kafka / Zookeeper. It manages offsets as checkpointed state with Flink, and uses those offsets for exactly-once.
Currently the “auto.offset.reset” is passed into the internally used KafkaConsumer as a means to define start position, but there’s work to further “de-couple” this so that respecting existing offsets in Kafka is merely one of the possible ways to define start position for the consumer, and that Flink isn’t depending on them in any way.

There’s more detail on this in these JIRAs:

Cheers,
Gordon

On January 16, 2017 at 8:58:21 PM, Matthias J. Sax ([hidden email]) wrote:

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

I would put this differently: "auto.offset.reset" policy is only used,
if there are no valid committed offsets for a topic.

See here:
http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups
- -and-offset-management

(don't be confused about "earliest/latest" and "smallest/larges" --
the former is for Kafka 0.8.2 and the later for 0.9+ -- but the
mechanism is the same)

But I though, Flink does not rely on consumer offsets commits but does
"manual" offset management? So I am wondering, if this property is
passed into the Kafka source operator's Kafka consumer or not?


- -Matthias


On 11/17/15 2:30 AM, Robert Metzger wrote:

> Hi Will,
>
> In Kafka's consumer configuration [1] there is a configuration
> parameter called "auto.offset.reset". Setting it to "smallest" will
> tell the consumer to start reading a topic from the smallest
> available offset.
>
> You can pass the configuration using the properties of the Kafka
> consumer.
>
>
> [1] http://kafka.apache.org/documentation.html#consumerconfigs
>
>
> On Tue, Nov 17, 2015 at 8:55 AM, Miaoyongqiang (Will)
> <[hidden email] <mailto:[hidden email]>>
> wrote:
>
> Hi,____
>
> __ __
>
> How can I tell a “FlinkKafkaConsumer” that I want to read from a
> topic from the beginning?____
>
> __ __
>
> Thanks,____
>
> Will____
>
>
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIYBAEBCgAGBQJYfSWaAAoJELz8Z8hxAGOilZ0P2gJZzeSpSU5RK7gmrL5oohyA
T+mKWXIkdMepDNec6w4zM0V07NnObu0UsVqPWEJmdOHg6bFihxmjO8i+7vYFShDH
9h26pChB7W6nvrwrASRiTXLNQl9rhMrBmp2qsMXskjKCHn+pHGeT0+LIt91sCwL0
VndFzk36UolfleGxpeQkcmPfNeTvlHws7nI5Imv5flsGIvWuGyJr/1v1Z2bWuXYj
PxE2vndoQo4yvcgEfSI3kNnm3vKnflPi83SuCY5r+C2lfiz1c83GM/yPPwlcUR5c
KjfeDQidy0B9npYkvTqoJV7Fm0oGvWjKKHCoS5HRrk4ha8WrakS/5FNpwf+FaOhi
+TCCdi9TAHhYd0lD183HK/F6bbnHTvo75C9PsCjcF7gFWDOj9sBgvTNvz8SgokpQ
g+QeiWtfi/YeU1TRWfM/KlpBdr5O/KmPFJ6XxIzXzUQmjR+z+Rp0j/hWq6o4loS5
OlJbtZon08HMcGIC0hQOGlnF2tKMkwEuatA3/fDor9AU2TAmQjhdZGvAu/RIa9IX
yKATrFjdxLLk3sUVvowTnnK1kSEApM4g3m3hGdPVzqsIWzbjgsNSvBDPKEma7oFu
y3cpo+x7uqE0QkJpDaja2zvYdRu91lwAJIkpDPknE/Ip2x6j+sWPwz3NRTRK7eEN
NH65TaPJXQvipDA=
=iVUW
-----END PGP SIGNATURE-----