Hi, I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table. as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_id (input stream is partitioned by user_id) is there a way to do this via SQL only ? without using org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner In other words how can we express the stream key (keyedBy) via the SQL layer ? For instance in Hive they expose a system column called __key or __partition that can be used to do this via SQL layer (see https://github.com/apache/hive/tree/master/kafka-handler#table-definitions) CREATE TABLE input_kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior_partition_by_uid', 'properties.bootstrap.servers' = 'localhost:9092', ) CREATE TABLE output_kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior_partition_by_iid', 'properties.bootstrap.servers' = 'localhost:9092', ) -- B-Slim _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ |
Hi, I'm pulling in some Flink SQL experts (in CC) to help you with this one :) Cheers, Gordon On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[hidden email]> wrote:
|
Hi Slim, In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and set the class name to 'sink.partitioner' option. In 1.12, you can re-partition the data by specifying the key field (Kafka producer will partition data by the message key by default). You can do this by adding some additional options in 1.12. CREATE TABLE output_kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior_partition_by_iid', 'properties.bootstrap.servers' = 'localhost:9092', 'key.fields' = 'item_id', -- specify which columns will be written to message key 'key.format' = 'raw', 'value.format' = 'json' ); Best, Jark On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
Hi Jark Thanks very much will this work with Avro On Tue, Nov 17, 2020 at 07:44 Jark Wu <[hidden email]> wrote:
B-Slim _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ |
Yes, it works with all the formats supported by the kafka connector. On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra <[hidden email]> wrote:
|
Great, thanks! On Wed, Nov 18, 2020 at 18:21 Jark Wu <[hidden email]> wrote:
B-Slim _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______ |
Free forum by Nabble | Edit this page |