Kafka SQL table Re-partition via Flink SQL

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka SQL table Re-partition via Flink SQL

Slim Bouguerra
Hi,
I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table.
as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_id (input stream is partitioned by user_id) 
is there a way to do this via SQL only ? without using org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

In other words how can we express the stream key (keyedBy) via the SQL layer ?

For instance in Hive they expose a system column called  __key or __partition that can be used to do this via SQL layer  (see https://github.com/apache/hive/tree/master/kafka-handler#table-definitions)

CREATE TABLE input_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_uid',
 'properties.bootstrap.servers' = 'localhost:9092',
)

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
)



--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
Reply | Threaded
Open this post in threaded view
|

Re: Kafka SQL table Re-partition via Flink SQL

Tzu-Li (Gordon) Tai
Hi,

I'm pulling in some Flink SQL experts (in CC) to help you with this one :)

Cheers,
Gordon

On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[hidden email]> wrote:
Hi,
I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table.
as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_id (input stream is partitioned by user_id) 
is there a way to do this via SQL only ? without using org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

In other words how can we express the stream key (keyedBy) via the SQL layer ?

For instance in Hive they expose a system column called  __key or __partition that can be used to do this via SQL layer  (see https://github.com/apache/hive/tree/master/kafka-handler#table-definitions)

CREATE TABLE input_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_uid',
 'properties.bootstrap.servers' = 'localhost:9092',
)

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
)



--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
Reply | Threaded
Open this post in threaded view
|

Re: Kafka SQL table Re-partition via Flink SQL

Jark Wu-3
Hi Slim,

In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and set the class name to 'sink.partitioner' option.

In 1.12, you can re-partition the data by specifying the key field (Kafka producer will partition data by the message key by default). You can do this by adding some additional options in 1.12. 

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
 'key.fields' = 'item_id',  -- specify which columns will be written to message key
 'key.format' = 'raw',
 'value.format' = 'json' 
);


Best,
Jark



On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I'm pulling in some Flink SQL experts (in CC) to help you with this one :)

Cheers,
Gordon

On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[hidden email]> wrote:
Hi,
I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table.
as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_id (input stream is partitioned by user_id) 
is there a way to do this via SQL only ? without using org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

In other words how can we express the stream key (keyedBy) via the SQL layer ?

For instance in Hive they expose a system column called  __key or __partition that can be used to do this via SQL layer  (see https://github.com/apache/hive/tree/master/kafka-handler#table-definitions)

CREATE TABLE input_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_uid',
 'properties.bootstrap.servers' = 'localhost:9092',
)

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
)



--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
Reply | Threaded
Open this post in threaded view
|

Re: Kafka SQL table Re-partition via Flink SQL

Slim Bouguerra
Hi Jark
Thanks very much will this work with Avro 

On Tue, Nov 17, 2020 at 07:44 Jark Wu <[hidden email]> wrote:
Hi Slim,

In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and set the class name to 'sink.partitioner' option.

In 1.12, you can re-partition the data by specifying the key field (Kafka producer will partition data by the message key by default). You can do this by adding some additional options in 1.12. 

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
 'key.fields' = 'item_id',  -- specify which columns will be written to message key
 'key.format' = 'raw',
 'value.format' = 'json' 
);


Best,
Jark



On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I'm pulling in some Flink SQL experts (in CC) to help you with this one :)

Cheers,
Gordon

On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[hidden email]> wrote:
Hi,
I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table.
as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_id (input stream is partitioned by user_id) 
is there a way to do this via SQL only ? without using org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

In other words how can we express the stream key (keyedBy) via the SQL layer ?

For instance in Hive they expose a system column called  __key or __partition that can be used to do this via SQL layer  (see https://github.com/apache/hive/tree/master/kafka-handler#table-definitions)

CREATE TABLE input_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_uid',
 'properties.bootstrap.servers' = 'localhost:9092',
)

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
)



--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
Reply | Threaded
Open this post in threaded view
|

Re: Kafka SQL table Re-partition via Flink SQL

Jark Wu-3
Yes, it works with all the formats supported by the kafka connector.

On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra <[hidden email]> wrote:
Hi Jark
Thanks very much will this work with Avro 

On Tue, Nov 17, 2020 at 07:44 Jark Wu <[hidden email]> wrote:
Hi Slim,

In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and set the class name to 'sink.partitioner' option.

In 1.12, you can re-partition the data by specifying the key field (Kafka producer will partition data by the message key by default). You can do this by adding some additional options in 1.12. 

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
 'key.fields' = 'item_id',  -- specify which columns will be written to message key
 'key.format' = 'raw',
 'value.format' = 'json' 
);


Best,
Jark



On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I'm pulling in some Flink SQL experts (in CC) to help you with this one :)

Cheers,
Gordon

On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[hidden email]> wrote:
Hi,
I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table.
as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_id (input stream is partitioned by user_id) 
is there a way to do this via SQL only ? without using org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

In other words how can we express the stream key (keyedBy) via the SQL layer ?

For instance in Hive they expose a system column called  __key or __partition that can be used to do this via SQL layer  (see https://github.com/apache/hive/tree/master/kafka-handler#table-definitions)

CREATE TABLE input_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_uid',
 'properties.bootstrap.servers' = 'localhost:9092',
)

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
)



--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
Reply | Threaded
Open this post in threaded view
|

Re: Kafka SQL table Re-partition via Flink SQL

Slim Bouguerra
Great, thanks!

On Wed, Nov 18, 2020 at 18:21 Jark Wu <[hidden email]> wrote:
Yes, it works with all the formats supported by the kafka connector.

On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra <[hidden email]> wrote:
Hi Jark
Thanks very much will this work with Avro 

On Tue, Nov 17, 2020 at 07:44 Jark Wu <[hidden email]> wrote:
Hi Slim,

In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and set the class name to 'sink.partitioner' option.

In 1.12, you can re-partition the data by specifying the key field (Kafka producer will partition data by the message key by default). You can do this by adding some additional options in 1.12. 

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
 'key.fields' = 'item_id',  -- specify which columns will be written to message key
 'key.format' = 'raw',
 'value.format' = 'json' 
);


Best,
Jark



On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I'm pulling in some Flink SQL experts (in CC) to help you with this one :)

Cheers,
Gordon

On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <[hidden email]> wrote:
Hi,
I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table.
as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_id (input stream is partitioned by user_id) 
is there a way to do this via SQL only ? without using org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

In other words how can we express the stream key (keyedBy) via the SQL layer ?

For instance in Hive they expose a system column called  __key or __partition that can be used to do this via SQL layer  (see https://github.com/apache/hive/tree/master/kafka-handler#table-definitions)

CREATE TABLE input_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_uid',
 'properties.bootstrap.servers' = 'localhost:9092',
)

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
)



--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
--

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______