Hey,
I would like to use a round-robin kafka partitioner in the apache flink. (the default one) I forked the Kafka's code from the DefaultPartitioner class. public class HashPartitioner<T> extends KafkaPartitioner<T> implements Serializable { private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); @Override public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { if (serializedKey == null) { int nextValue = counter.getAndIncrement(); // key is null choose randomly return toPositive(nextValue) % numPartitions; } else { // hash the keyBytes to choose a partition return toPositive(Utils.murmur2(serializedKey)) % numPartitions; } } private static int toPositive(int number) { return number & 0x7fffffff; } } Is it a better way to do it ? Thanks, Konstantin -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Could you expand a bit more on what you
want to achieve?
(In particular where you want to use this partitioner; as an operation before a sink or within a kafka sink) On 24.10.2017 09:24, kla wrote: Hey, I would like to use a round-robin kafka partitioner in the apache flink. (the default one) I forked the Kafka's code from the DefaultPartitioner class. public class HashPartitioner<T> extends KafkaPartitioner<T> implements Serializable { private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); @Override public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { if (serializedKey == null) { int nextValue = counter.getAndIncrement(); // key is null choose randomly return toPositive(nextValue) % numPartitions; } else { // hash the keyBytes to choose a partition return toPositive(Utils.murmur2(serializedKey)) % numPartitions; } } private static int toPositive(int number) { return number & 0x7fffffff; } } Is it a better way to do it ? Thanks, Konstantin -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
|
Hi Chesnay,
Thanks for your reply. I would like to use the partitioner within the Kafka Sink operation. By default kafka sink is using FixedPartitioner: public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); } So I have 12 kafka topic partitions and I have 2 Flink partitions, and I have unbalanced partitioning. According to the java doc in the FixedPartitioner class which is following: * Not all Kafka partitions contain data * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will * cause a lot of network connections between all the Flink instances and all the Kafka brokers According to the this I have to use a round-robin kafka partitioner. And what is the right way to do it ? Thanks again. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi!
you will have to modify your partitioner to implement the FlinkKafkaPartitioner interface instead. You can then plug this into any kafka sink through on of the constructors. Regards, Chesnay On 24.10.2017 22:15, kla wrote: Hi Chesnay, Thanks for your reply. I would like to use the partitioner within the Kafka Sink operation. By default kafka sink is using FixedPartitioner: public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); } So I have 12 kafka topic partitions and I have 2 Flink partitions, and I have unbalanced partitioning. According to the java doc in the FixedPartitioner class which is following: * Not all Kafka partitions contain data * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will * cause a lot of network connections between all the Flink instances and all the Kafka brokers According to the this I have to use a round-robin kafka partitioner. And what is the right way to do it ? Thanks again. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
|
Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner. But the problem is that I would not like to "fork" the Kafka's source code. (Please check my first comment) Thanks, Konstantin -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
So you want to use the kafka
partitioner directly?
How about an adapter? public class KafkaPartitionerWrapper<T> extends KafkaPartitioner<T> implements Serializable { private final kafka.producer.Partitioner partitioner; public KafkaPartitionerWrapper(kafka.producer.Partitioner partitioner) { this.partitioner = partitioner; } @Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { // maybe pass Arrays.hashCode(key) instead return partitioner.partition(key, partitions.length); } }On 25.10.2017 09:58, kla wrote: Exactly, I did like this, the only thing is that I am using 1.2.0 version of Flink and in this version the class name is KafkaPartitioner. But the problem is that I would not like to "fork" the Kafka's source code. (Please check my first comment) Thanks, Konstantin -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
|
This post was updated on .
Thanks for your comment.
If I write the KafkaPartitioner anyway I have to somehow pass the *kafka.producer.Partitioner* which is not so easy. So I have found the easiest solution for this, you have just pass /null/ value: outputStream.addSink(new FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC), new EventSerializationSchema(), producerProperties, null)); Which means that *FlinkKafkaProducer* will automatically use the Kafka's default partitioner. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |