CREATE TABLE kafka_sink_table( warehouse_id INT, pack_task_order_id BIGINT, out_order_code STRING, pick_order_id BIGINT, end_time BIGINT WITH ( 'connector'='kafka', 'topic'='ods_wms_pack_task_order', 'properties.bootstrap.servers'='172.19.78.32:9092', 'format'='json' ); INSERT INTO kafka_sink_table SELECT ....... As describe here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html I want to do partition according to warehouse_id. How should i write my customer partitioner? Is there any example? Thanks, Lei |
Hi Lei,
you can check how the FlinkFixedPartitioner [1] or Tuple2FlinkPartitioner [2] are implemented. Since you are using SQL connectors of the newest generation, you should receive an instance of org.apache.flink.table.data.RowData in your partitioner. You can create a Maven project with a flink-connector-kafka_2.11 provided dependency and create a JAR with the class file. You should then be able to pass the JAR to SQL as you pass other JARs. Regards, Timo [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java On 18.08.20 12:57, [hidden email] wrote: > > > CREATE TABLE kafka_sink_table( > warehouse_id INT, > pack_task_order_id BIGINT, > out_order_code STRING, > pick_order_id BIGINT, > end_time BIGINT > WITH ( > 'connector'='kafka', > 'topic'='ods_wms_pack_task_order', > 'properties.bootstrap.servers'='172.19.78.32:9092', > 'format'='json' > ); > > > INSERT INTO kafka_sink_table SELECT ....... > > As describe here: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html > > I want to do partition according to warehouse_id. > > How should i write my customer partitioner? Is there any example? > > Thanks, > Lei > > ------------------------------------------------------------------------ > [hidden email] <mailto:[hidden email]> |
In reply to this post by wanglei2@geekplus.com
Hi Lei, If you want to write your custom partitioner, I think you can refer to the built-in FlinkFixedPartitioner[1] [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java Best, Xingbo
|
In reply to this post by wanglei2@geekplus.com
Hi, Lei ~
You may need to implement the abstract class FlinkKafkaPartitioner and then use the full class name as the param value of the option ‘sink.partitioner’. FlinkFixedPartitioner[1] is a good example there.
Best,
Danny Chan
在 2020年8月18日 +0800 PM8:18,[hidden email] <[hidden email]>,写道:
|
Free forum by Nabble | Edit this page |