Does Flink Kafka connector has max_pending_offsets concept?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Does Flink Kafka connector has max_pending_offsets concept?

xwang355

Hi Flink users,

When # of Kafka consumers  = # of partitions, and I use setParallelism(>1), something like this

'messageSteam.rebalance().map(lamba).setParallelism(3).print()'

How do I tune # of outstanding uncommitted offset? Something similar to 

https://storm.apache.org/releases/1.1.2/storm-kafka-client.html in Storm.

Thanks
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink Kafka connector has max_pending_offsets concept?

Elias Levy
There is no such concept in Flink.  Flink tracks offsets in its checkpoints.  It can optionally commit offsets to Kafka, but that is only for reporting purposes.  If you wish to lower the number of records that get reprocessed in the case of a restart, then you must lower the checkpoint interval.

On Tue, Jun 4, 2019 at 10:47 AM wang xuchen <[hidden email]> wrote:

Hi Flink users,

When # of Kafka consumers  = # of partitions, and I use setParallelism(>1), something like this

'messageSteam.rebalance().map(lamba).setParallelism(3).print()'

How do I tune # of outstanding uncommitted offset? Something similar to 

https://storm.apache.org/releases/1.1.2/storm-kafka-client.html in Storm.

Thanks
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink Kafka connector has max_pending_offsets concept?

xwang355
Elias Thanks for your reply. In this case,

*When # of Kafka consumers  = # of partitions, and I use setParallelism(>1),
something like this
'messageSteam.rebalance().map(lamba).setParallelism(3).print()'
*

If checkpointing is enabled, I assume Flink will commit the offsets in the
'right order' during checkpoint.


For example, if a batch of offsets comprised of (1,2,3,4,5) and there are
three worker threads(setParallelism(3)

thread 1 -> 1     [stuck by a sync call]
thread 2 -> 2, 3 [success]
thread 3 -> 4, 5  [success]

Will Flink commit 5?

I just want to make sure that Flink will manage the pending offsets
correctly so that there will be no data lost if the above code is used on
production.

Thanks again!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink Kafka connector has max_pending_offsets concept?

Fabian Hueske-2
Hi Ben,

Flink correctly maintains the offsets of all partitions that are read by a Kafka consumer.
A checkpoint is only complete when all functions successful checkpoint their state. For a Kafka consumer, this state is the current reading offset.
In case of a failure the offsets and the state of all functions are reset to the last completed checkpoint.

Best, Fabian

Am Mi., 5. Juni 2019 um 22:58 Uhr schrieb xwang355 <[hidden email]>:
Elias Thanks for your reply. In this case,

*When # of Kafka consumers  = # of partitions, and I use setParallelism(>1),
something like this
'messageSteam.rebalance().map(lamba).setParallelism(3).print()'
*

If checkpointing is enabled, I assume Flink will commit the offsets in the
'right order' during checkpoint.


For example, if a batch of offsets comprised of (1,2,3,4,5) and there are
three worker threads(setParallelism(3)

thread 1 -> 1     [stuck by a sync call]
thread 2 -> 2, 3 [success]
thread 3 -> 4, 5  [success]

Will Flink commit 5?

I just want to make sure that Flink will manage the pending offsets
correctly so that there will be no data lost if the above code is used on
production.

Thanks again!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink Kafka connector has max_pending_offsets concept?

xwang355
Thanks Fabian. This is really helpful.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/