FlinkKafkaConsumer and Kafka topic/partition change

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaConsumer and Kafka topic/partition change

Hironori Ogibayashi
Hello,

I want FlinkKafkaConsumer to follow changes in Kafka topic/partition change.
This means:
- When we add partitions to a topic, we want FlinkKafkaConsumer to
start reading added partitions.
- We want to specify topics by pattern (e.g accesslog.*), and want
FlinkKafkaConsumer to start reading new topics if they appeared after
starting job.

As long as reading source code and my experiment, FlinkKafkaConsumer
uses KafkaConsumer.assign() instead of subscribe(), so partitions are
assigned to each KafkaConsumer instance just once at job starting
time.

Is there any way to let FlinkKafkaConsumer follow topic/partition change?

Regards,
Hironori Ogibayashi
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and Kafka topic/partition change

Tzu-Li (Gordon) Tai
Hi!

This is definitely a planned feature for the Kafka connectors, there’s a JIRA exactly for this [1].
We’re currently going through some blocking tasks to make this happen, I also hope to speed up things over there :)

Your observation is correct that the Kaka consumer uses “assign()” instead of “subscribe()”.
This is due to the fact that the partition-to-subtask assignment needs to be determinate in Flink
for exactly-once semantics.
If you’re not concerned about exactly-once and want to experiment around for now before [1] comes around,
I believe Robert has recently implemented a Kafka consumer that uses “subscribe()”, so the Kafka
topics can scale (looping in Robert to provide more info about this one).

Best Regards,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4022


On September 27, 2016 at 6:17:06 PM, Hironori Ogibayashi ([hidden email]) wrote:

Hello,

I want FlinkKafkaConsumer to follow changes in Kafka topic/partition change.
This means:
- When we add partitions to a topic, we want FlinkKafkaConsumer to
start reading added partitions.
- We want to specify topics by pattern (e.g accesslog.*), and want
FlinkKafkaConsumer to start reading new topics if they appeared after
starting job.

As long as reading source code and my experiment, FlinkKafkaConsumer
uses KafkaConsumer.assign() instead of subscribe(), so partitions are
assigned to each KafkaConsumer instance just once at job starting
time.

Is there any way to let FlinkKafkaConsumer follow topic/partition change?

Regards,
Hironori Ogibayashi
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and Kafka topic/partition change

Hironori Ogibayashi
Gordon,

Thank you for your quick response!
I am looking forward to that feature. I will periodically check that JIRA.

I am also interested in the Robert's implementation because my use
current case is
system monitoring and scalability has higher priority than correctness.

Regards,
Hironori

2016-09-27 19:53 GMT+09:00 Tzu-Li (Gordon) Tai <[hidden email]>:

> Hi!
>
> This is definitely a planned feature for the Kafka connectors, there’s a
> JIRA exactly for this [1].
> We’re currently going through some blocking tasks to make this happen, I
> also hope to speed up things over there :)
>
> Your observation is correct that the Kaka consumer uses “assign()” instead
> of “subscribe()”.
> This is due to the fact that the partition-to-subtask assignment needs to be
> determinate in Flink
> for exactly-once semantics.
> If you’re not concerned about exactly-once and want to experiment around for
> now before [1] comes around,
> I believe Robert has recently implemented a Kafka consumer that uses
> “subscribe()”, so the Kafka
> topics can scale (looping in Robert to provide more info about this one).
>
> Best Regards,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-4022
>
>
> On September 27, 2016 at 6:17:06 PM, Hironori Ogibayashi
> ([hidden email]) wrote:
>
> Hello,
>
> I want FlinkKafkaConsumer to follow changes in Kafka topic/partition change.
> This means:
> - When we add partitions to a topic, we want FlinkKafkaConsumer to
> start reading added partitions.
> - We want to specify topics by pattern (e.g accesslog.*), and want
> FlinkKafkaConsumer to start reading new topics if they appeared after
> starting job.
>
> As long as reading source code and my experiment, FlinkKafkaConsumer
> uses KafkaConsumer.assign() instead of subscribe(), so partitions are
> assigned to each KafkaConsumer instance just once at job starting
> time.
>
> Is there any way to let FlinkKafkaConsumer follow topic/partition change?
>
> Regards,
> Hironori Ogibayashi