Use a round-robin kafka partitioner

classic Classic list List threaded Threaded
7 messages Options
kla
Reply | Threaded
Open this post in threaded view
|

Use a round-robin kafka partitioner

kla
Hey,

I would like to use a round-robin kafka partitioner in the apache flink.
(the default one)

I forked the Kafka's code from the DefaultPartitioner class.

public class HashPartitioner<T> extends KafkaPartitioner<T> implements
Serializable {

    private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());

    @Override
    public int partition(T next, byte[] serializedKey, byte[]
serializedValue, int numPartitions) {

        if (serializedKey == null) {
            int nextValue = counter.getAndIncrement();
            // key is null choose randomly
            return toPositive(nextValue) % numPartitions;
        } else {
            // hash the keyBytes to choose a partition
            return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
        }
    }

    private static int toPositive(int number) {
        return number & 0x7fffffff;

    }
}

Is it a better way to do it ?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Use a round-robin kafka partitioner

Chesnay Schepler
Could you expand a bit more on what you want to achieve?
(In particular where you want to use this partitioner; as an operation before a sink
or within a kafka sink)

On 24.10.2017 09:24, kla wrote:
Hey, 

I would like to use a round-robin kafka partitioner in the apache flink. 
(the default one)

I forked the Kafka's code from the DefaultPartitioner class.

public class HashPartitioner<T> extends KafkaPartitioner<T> implements
Serializable {

    private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());

    @Override
    public int partition(T next, byte[] serializedKey, byte[]
serializedValue, int numPartitions) {

        if (serializedKey == null) {
            int nextValue = counter.getAndIncrement();
            // key is null choose randomly
            return toPositive(nextValue) % numPartitions;
        } else {
            // hash the keyBytes to choose a partition
            return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
        }
    }

    private static int toPositive(int number) {
        return number & 0x7fffffff;

    }
}

Is it a better way to do it ?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


kla
Reply | Threaded
Open this post in threaded view
|

Re: Use a round-robin kafka partitioner

kla
Hi Chesnay,

Thanks for your reply.

I would like to use the partitioner within the Kafka Sink operation.

By default kafka sink is using FixedPartitioner:

        public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
serializationSchema, Properties producerConfig) {
                this(topicId, serializationSchema, producerConfig, new
FixedPartitioner<T>());
        }

So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
have unbalanced partitioning.
According to the java doc in the FixedPartitioner class which is following:

 *  Not all Kafka partitions contain data
 *  To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
 *  cause a lot of network connections between all the Flink instances and
all the Kafka brokers

According to the this I have to use a round-robin kafka partitioner. And
what is the right way to do it ?

Thanks again.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Use a round-robin kafka partitioner

Chesnay Schepler
Hi!

you will have to modify your partitioner to implement the FlinkKafkaPartitioner interface instead.
You can then plug this into any kafka sink through on of the constructors.

Regards,
Chesnay

On 24.10.2017 22:15, kla wrote:
Hi Chesnay,

Thanks for your reply.

I would like to use the partitioner within the Kafka Sink operation.

By default kafka sink is using FixedPartitioner:

	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
serializationSchema, Properties producerConfig) {
		this(topicId, serializationSchema, producerConfig, new
FixedPartitioner<T>());
	}

So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
have unbalanced partitioning.
According to the java doc in the FixedPartitioner class which is following:

 *  Not all Kafka partitions contain data
 *  To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
 *  cause a lot of network connections between all the Flink instances and
all the Kafka brokers

According to the this I have to use a round-robin kafka partitioner. And
what is the right way to do it ?

Thanks again.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


kla
Reply | Threaded
Open this post in threaded view
|

Re: Use a round-robin kafka partitioner

kla
Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner.

But the problem is that I would not like to "fork" the Kafka's source code.
(Please check my first comment)

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Use a round-robin kafka partitioner

Chesnay Schepler
So you want to use the kafka partitioner directly?

How about an adapter?
public class KafkaPartitionerWrapper<T> extends KafkaPartitioner<T> implements Serializable {
   private final kafka.producer.Partitioner partitioner;

   public KafkaPartitionerWrapper(kafka.producer.Partitioner partitioner) {
      this.partitioner = partitioner;
   }

   @Override
   public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
      // maybe pass Arrays.hashCode(key) instead
      return partitioner.partition(key, partitions.length);
   }
}
On 25.10.2017 09:58, kla wrote:
Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner.

But the problem is that I would not like to "fork" the Kafka's source code.
(Please check my first comment)

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


kla
Reply | Threaded
Open this post in threaded view
|

Re: Use a round-robin kafka partitioner

kla
This post was updated on .
Thanks for your comment.
If I write the KafkaPartitioner anyway I have to somehow pass the
*kafka.producer.Partitioner* which is not so easy.

So I have found the easiest solution for this, you have just pass /null/
value:

outputStream.addSink(new
FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC),
                                                         new
EventSerializationSchema(),
                                                         producerProperties,
null));

Which means that *FlinkKafkaProducer* will automatically use the Kafka's
default partitioner.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/