Partitioning based on key flink kafka sink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Partitioning based on key flink kafka sink

Vishwas Siravara
Hi all,
I am using flink 1.7.0 and using this constructor 
FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig)
From the doc it says this constructor uses fixed partitioner. I want to partition based on key , so I tried to use this 
public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner)
What should I pass in the optional field ? From the doc it says 
@param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If a partitioner is not provided, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
This is super confusing(contradicting in a way) since the previous constructor says that fixedpartitioner will be used if customPartioner is not present. 

Best,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Partitioning based on key flink kafka sink

vino yang
Hi Vishwas,

You should pay attention to the other args.

The constructor provided by you has a `KeyedSerializationSchema` arg, while the comments of the constructor which made you confused only has a `SerializationSchema` arg. That's their difference.

Best,
Vino

Vishwas Siravara <[hidden email]> 于2019年11月6日周三 上午9:16写道:
Hi all,
I am using flink 1.7.0 and using this constructor 
FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig)
From the doc it says this constructor uses fixed partitioner. I want to partition based on key , so I tried to use this 
public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner)
What should I pass in the optional field ? From the doc it says 
@param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If a partitioner is not provided, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
This is super confusing(contradicting in a way) since the previous constructor says that fixedpartitioner will be used if customPartioner is not present. 

Best,
Vishwas