How to write a customer sink partitioner when using flinksql kafka-connector

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to write a customer sink partitioner when using flinksql kafka-connector

wanglei2@geekplus.com


CREATE TABLE kafka_sink_table(
 warehouse_id INT,
 pack_task_order_id BIGINT,
 out_order_code STRING,
 pick_order_id BIGINT,
 end_time BIGINT
WITH (
 'connector'='kafka',
 'topic'='ods_wms_pack_task_order',
 'properties.bootstrap.servers'='172.19.78.32:9092',
 'format'='json'
);


INSERT INTO  kafka_sink_table SELECT  ....... 

I want to do partition according to warehouse_id.

How should i write my customer partitioner? Is there any example?

Thanks,
Lei


Reply | Threaded
Open this post in threaded view
|

Re: How to write a customer sink partitioner when using flinksql kafka-connector

Timo Walther
Hi Lei,

you can check how the FlinkFixedPartitioner [1] or
Tuple2FlinkPartitioner [2] are implemented. Since you are using SQL
connectors of the newest generation, you should receive an instance of
org.apache.flink.table.data.RowData in your partitioner.

You can create a Maven project with a flink-connector-kafka_2.11
provided dependency and create a JAR with the class file. You should
then be able to pass the JAR to SQL as you pass other JARs.

Regards,
Timo

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java


On 18.08.20 12:57, [hidden email] wrote:

>
>
> CREATE TABLE kafka_sink_table(
>   warehouse_id INT,
>   pack_task_order_id BIGINT,
>   out_order_code STRING,
>   pick_order_id BIGINT,
>   end_time BIGINT
> WITH (
>   'connector'='kafka',
>   'topic'='ods_wms_pack_task_order',
>   'properties.bootstrap.servers'='172.19.78.32:9092',
>   'format'='json'
> );
>
>
> INSERT INTO kafka_sink_table SELECT  .......
>
> As describe here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html 
>
> I want to do partition according to warehouse_id.
>
> How should i write my customer partitioner? Is there any example?
>
> Thanks,
> Lei
>
> ------------------------------------------------------------------------
> [hidden email] <mailto:[hidden email]>

Reply | Threaded
Open this post in threaded view
|

Re: How to write a customer sink partitioner when using flinksql kafka-connector

Xingbo Huang
In reply to this post by wanglei2@geekplus.com
Hi Lei,

If you want to write your custom partitioner, I think you can refer to the built-in FlinkFixedPartitioner[1]

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java

Best,
Xingbo

[hidden email] <[hidden email]> 于2020年8月18日周二 下午8:18写道:


CREATE TABLE kafka_sink_table(
 warehouse_id INT,
 pack_task_order_id BIGINT,
 out_order_code STRING,
 pick_order_id BIGINT,
 end_time BIGINT
WITH (
 'connector'='kafka',
 'topic'='ods_wms_pack_task_order',
 'properties.bootstrap.servers'='172.19.78.32:9092',
 'format'='json'
);


INSERT INTO  kafka_sink_table SELECT  ....... 

I want to do partition according to warehouse_id.

How should i write my customer partitioner? Is there any example?

Thanks,
Lei


Reply | Threaded
Open this post in threaded view
|

Re: How to write a customer sink partitioner when using flinksql kafka-connector

Danny Chan
In reply to this post by wanglei2@geekplus.com
Hi, Lei ~
You may need to implement the abstract class FlinkKafkaPartitioner and then use the full class name as the param value of the option ‘sink.partitioner’. FlinkFixedPartitioner[1] is a good example there.


Best,
Danny Chan
在 2020年8月18日 +0800 PM8:18,[hidden email] <[hidden email]>,写道:


CREATE TABLE kafka_sink_table(
 warehouse_id INT,
 pack_task_order_id BIGINT,
 out_order_code STRING,
 pick_order_id BIGINT,
 end_time BIGINT
WITH (
 'connector'='kafka',
 'topic'='ods_wms_pack_task_order',
 'properties.bootstrap.servers'='172.19.78.32:9092',
 'format'='json'
);


INSERT INTO  kafka_sink_table SELECT  ....... 

I want to do partition according to warehouse_id.

How should i write my customer partitioner? Is there any example?

Thanks,
Lei