What happened if my parallelism more than kafka partitions.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

What happened if my parallelism more than kafka partitions.

yunfan123
It seems the same partition data will be consume multi times?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: What happened if my parallelism more than kafka partitions.

Tzu-Li (Gordon) Tai
Hi!

You can set the parallelism of the Flink Kafka Consumer independent of the number of partitions.
If there are more consumer subtasks than the number of Kafka partitions to read (i.e. when the parallelism of the consumer is set higher than the number of partitions), some subtasks will simply remain idle.
Each Kafka partition is deterministically assigned to a single consumer subtask.

Cheers,
Gordon


On 8 November 2017 at 4:21:54 PM, yunfan123 ([hidden email]) wrote:

It seems the same partition data will be consume multi times?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: What happened if my parallelism more than kafka partitions.

yunfan123
The code of kafka partition assign is like follows:

public static int assign(KafkaTopicPartition partition, int
numParallelSubtasks) {
                int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %
numParallelSubtasks;

                // here, the assumption is that the id of Kafka partitions are always
ascending
                // starting from 0, and therefore can be used directly as the offset
clockwise from the start index
                return (startIndex + partition.getPartition()) % numParallelSubtasks;
        }

It seems it will assign to multi sub tasks.
I wonder how flink ensure some subtasks will simply remain idle



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: What happened if my parallelism more than kafka partitions.

Tzu-Li (Gordon) Tai
The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method returns the index of the target subtask for a given Kafka partition.
The implementation in that method ensures that the same subtask index will always be returned for the same partition.

Each consumer subtask will locally invoke this assignment method for each Kafka partition.
If the returned subtask index doesn’t equal the subtask’s index, that partition will be filtered out and not be read by the subtask.

On 8 November 2017 at 6:38:54 PM, yunfan123 ([hidden email]) wrote:

The code of kafka partition assign is like follows:

public static int assign(KafkaTopicPartition partition, int
numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %
numParallelSubtasks;

// here, the assumption is that the id of Kafka partitions are always
ascending
// starting from 0, and therefore can be used directly as the offset
clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}

It seems it will assign to multi sub tasks.
I wonder how flink ensure some subtasks will simply remain idle



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/