KafkaConsumerBase

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

KafkaConsumerBase

aitozi

Hi,

i have a question that , when we use KafkaConsumerBase, we will have to fetch data from different partition  
 in different parllel thread like the method shown in  KafkaConsumerBase.java (version 1.2.0)

        protected static List<KafkaTopicPartition> assignPartitions(
                        List<KafkaTopicPartition> allPartitions,
                        int numConsumers, int consumerIndex) {
                final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
                                allPartitions.size() / numConsumers + 1);

                for (int i = 0; i < allPartitions.size(); i++) {
                        if (i % numConsumers == consumerIndex) {
                                thisSubtaskPartitions.add(allPartitions.get(i));
                        }
                }
               
                return thisSubtaskPartitions;
        }

but i have not find any place invoke this method ,  in KafkaConsumerThread.java it used

consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));

i think here subscribedPartitions is all the partitions , not subtaskPartitions.  Can any one address my problem
Reply | Threaded
Open this post in threaded view
|

Re: KafkaConsumerBase

Tzu-Li (Gordon) Tai
Hi!

method shown in KafkaConsumerBase.java (version 1.2.0) 

A lot has changed in the FlinkKafkaConsumerBase since version 1.2.0.
And if I remember correctly, the `assignPartitions` method was actually a no longer relevant method used in the code, and was properly removed afterwards.
The method for partition assigning in 1.2.0 is called `assignTopicPartitions`, and is used in the open() method.

consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitions)); 

i think here subscribedPartitions is all the partitions , not 
subtaskPartitions.

This code snippet is from `KafkaConsumerThread`, correct?

As stated above, the partitions are still filtered out to only be the partitions for each local subtask, using the `assignTopicPartitions` method. So here, the `subscribedPartitions` is not the complete list of partitions, only the partitions that the subtask should subscribe to.


Cheers,
Gordon

On 2 August 2017 at 9:52:03 PM, aitozi ([hidden email]) wrote:


Hi,

i have a question that , when we use KafkaConsumerBase, we will have to
fetch data from different partition
in different parllel thread like the method shown in
KafkaConsumerBase.java (version 1.2.0)

protected static List<KafkaTopicPartition> assignPartitions(
List<KafkaTopicPartition> allPartitions,
int numConsumers, int consumerIndex) {
final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
allPartitions.size() / numConsumers + 1);

for (int i = 0; i < allPartitions.size(); i++) {
if (i % numConsumers == consumerIndex) {
thisSubtaskPartitions.add(allPartitions.get(i));
}
}

return thisSubtaskPartitions;
}

but i have not find any place invoke this method , in
KafkaConsumerThread.java it used

consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));

i think here subscribedPartitions is all the partitions , not
subtaskPartitions. Can any one address my problem



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: KafkaConsumerBase

aitozi


Hi,Gordon

    Yes, just now i again read the code in assignTopicPartitions method , it indeed subscribe the partition the subtask should subscribe to. i  didn't read the for loop generate subscribedPartitions for each subtasks in assignTopicPartitions carefully before

for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
                                subscribedPartitions.add(kafkaTopicPartitions.get(i)); }

you ar right : "the partitions are still filtered out to only be the partitions for each local subtask, using the `assignTopicPartitions` method"

Thanks
aitozi