The code of kafka partition assign is like follows:
public static int assign(KafkaTopicPartition partition, int
numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %
numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always
ascending
// starting from 0, and therefore can be used directly as the offset
clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}
It seems it will assign to multi sub tasks.
I wonder how flink ensure some subtasks will simply remain idle
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/