Hi, i have a question that , when we use KafkaConsumerBase, we will have to fetch data from different partition in different parllel thread like the method shown in KafkaConsumerBase.java (version 1.2.0) protected static List<KafkaTopicPartition> assignPartitions( List<KafkaTopicPartition> allPartitions, int numConsumers, int consumerIndex) { final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>( allPartitions.size() / numConsumers + 1); for (int i = 0; i < allPartitions.size(); i++) { if (i % numConsumers == consumerIndex) { thisSubtaskPartitions.add(allPartitions.get(i)); } } return thisSubtaskPartitions; } but i have not find any place invoke this method , in KafkaConsumerThread.java it used consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions)); i think here subscribedPartitions is all the partitions , not subtaskPartitions. Can any one address my problem |
Hi! method shown in KafkaConsumerBase.java (version 1.2.0) And if I remember correctly, the `assignPartitions` method was actually a no longer relevant method used in the code, and was properly removed afterwards. The method for partition assigning in 1.2.0 is called `assignTopicPartitions`, and is used in the open() method. consumerCallBridge.assignPartitions(consumer, This code snippet is from `KafkaConsumerThread`, correct? As stated above, the partitions are still filtered out to only be the partitions for each local subtask, using the `assignTopicPartitions` method. So here, the `subscribedPartitions` is not the complete list of partitions, only the partitions that the subtask should subscribe to. Cheers, Gordon On 2 August 2017 at 9:52:03 PM, aitozi ([hidden email]) wrote:
|
Hi,Gordon Yes, just now i again read the code in assignTopicPartitions method , it indeed subscribe the partition the subtask should subscribe to. i didn't read the for loop generate subscribedPartitions for each subtasks in assignTopicPartitions carefully before for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) { subscribedPartitions.add(kafkaTopicPartitions.get(i)); } you ar right : "the partitions are still filtered out to only be the partitions for each local subtask, using the `assignTopicPartitions` method" Thanks aitozi |
Free forum by Nabble | Edit this page |