Flip-105 can the debezium/canal SQL sink to database directly?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flip-105 can the debezium/canal SQL sink to database directly?

wanglei2@geekplus.com.cn

CREATE TABLE my_table (
  id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei



Reply | Threaded
Open this post in threaded view
|

Re: Flip-105 can the debezium/canal SQL sink to database directly?

Jingsong Li
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM [hidden email] <[hidden email]> wrote:

CREATE TABLE my_table (
  id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei





--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flip-105 can the debezium/canal SQL sink to database directly?

Leonard Xu
HI Lei,
Jingsong is wright, you need define a primary key for your sink table.
BTW, Flink use `PRIMARY KEY NOT ENFORCED` to define primary key because Flink doesn’t own data and only supports `NOT ENFORCED` mode, it’s a little bit different with the primary key  in DB which is default `ENFORCED` , both  `ENFORCED ` and `NOT ENFORCED` are supported in SQL standard.
You can look up[1][2] for more details.

Best,
Leonard
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

在 2020年6月30日,10:08,Jingsong Li <[hidden email]> 写道:

Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM [hidden email] <[hidden email]> wrote:

CREATE TABLE my_table (
  id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei





--
Best, Jingsong Lee

Reply | Threaded
Open this post in threaded view
|

Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

wanglei2@geekplus.com.cn
In reply to this post by Jingsong Li
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  


 
Date: 2020-06-30 10:08
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM [hidden email] <[hidden email]> wrote:

CREATE TABLE my_table (
  id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei





--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

Jingsong Li
Hi,

Welcome to try 1.11.

There is no direct doc to describe this, but I think these docs can help you [1][2]


Best,
Jingsong

On Tue, Jun 30, 2020 at 10:25 AM [hidden email] <[hidden email]> wrote:
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  


 
Date: 2020-06-30 10:08
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM [hidden email] <[hidden email]> wrote:

CREATE TABLE my_table (
  id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei





--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

wanglei2@geekplus.com.cn
In reply to this post by wanglei2@geekplus.com.cn
CREATE TABLE t_pick_order (
      order_no VARCHAR,
      status INT
  ) WITH (
      'connector' = 'kafka',
      'topic' = 'example',
      'scan.startup.mode' = 'latest-offset',
      'properties.bootstrap.servers' = '172.19.78.32:9092',
      'format' = 'canal-json'
   )
CREATE TABLE order_status (
          order_no VARCHAR,
          status INT,
		  PRIMARY KEY (order_no) NOT ENFORCED
      ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://xxx:3306/flink_test',
          'table-name' = 'order_status',
          'username' = 'dev',
          'password' = 'xxxx'
       )


But when i execute insert INTO order_status SELECT order_no, status FROM t_pick_order  
There's error: 

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])



Date: 2020-06-30 20:25
Subject: Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi, wanglei2 ~

For primary key syntax you can reference [1] for the “PRIMARY KEY” part, notice that currently we only support the NOT ENFORCED mode. Here is the reason:

>SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode. It is up to the user to ensure that the query enforces key integrity.

For DDL to create JDBC table, you can reference [2]

[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

Best,
Danny Chan
在 2020年6月30日 +0800 AM10:25,[hidden email] <[hidden email]>,写道:
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  


 
Date: 2020-06-30 10:08
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM [hidden email] <[hidden email]> wrote:

CREATE TABLE my_table (
id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei





--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

Jingsong Li

Best,
Jingsong

On Wed, Jul 1, 2020 at 5:55 PM [hidden email] <[hidden email]> wrote:
CREATE TABLE t_pick_order (
      order_no VARCHAR,
      status INT
  ) WITH (
      'connector' = 'kafka',
      'topic' = 'example',
      'scan.startup.mode' = 'latest-offset',
      'properties.bootstrap.servers' = '172.19.78.32:9092',
      'format' = 'canal-json'
   )
CREATE TABLE order_status (
          order_no VARCHAR,
          status INT,
		  PRIMARY KEY (order_no) NOT ENFORCED
      ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://xxx:3306/flink_test',
          'table-name' = 'order_status',
          'username' = 'dev',
          'password' = 'xxxx'
       )


But when i execute insert INTO order_status SELECT order_no, status FROM t_pick_order  
There's error: 

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])



Date: 2020-06-30 20:25
Subject: Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi, wanglei2 ~

For primary key syntax you can reference [1] for the “PRIMARY KEY” part, notice that currently we only support the NOT ENFORCED mode. Here is the reason:

>SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode. It is up to the user to ensure that the query enforces key integrity.

For DDL to create JDBC table, you can reference [2]

[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

Best,
Danny Chan
在 2020年6月30日 +0800 AM10:25,[hidden email] <[hidden email]>,写道:
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  


 
Date: 2020-06-30 10:08
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM [hidden email] <[hidden email]> wrote:

CREATE TABLE my_table (
id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei





--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

Jark Wu-3
I have created an issue [1] and a pull request to fix this. Hope we can catch up with this release. 

Best,
Jark


On Wed, 1 Jul 2020 at 18:16, Jingsong Li <[hidden email]> wrote:

Best,
Jingsong

On Wed, Jul 1, 2020 at 5:55 PM [hidden email] <[hidden email]> wrote:
CREATE TABLE t_pick_order (
      order_no VARCHAR,
      status INT
  ) WITH (
      'connector' = 'kafka',
      'topic' = 'example',
      'scan.startup.mode' = 'latest-offset',
      'properties.bootstrap.servers' = '172.19.78.32:9092',
      'format' = 'canal-json'
   )
CREATE TABLE order_status (
          order_no VARCHAR,
          status INT,
		  PRIMARY KEY (order_no) NOT ENFORCED
      ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://xxx:3306/flink_test',
          'table-name' = 'order_status',
          'username' = 'dev',
          'password' = 'xxxx'
       )


But when i execute insert INTO order_status SELECT order_no, status FROM t_pick_order  
There's error: 

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])



Date: 2020-06-30 20:25
Subject: Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi, wanglei2 ~

For primary key syntax you can reference [1] for the “PRIMARY KEY” part, notice that currently we only support the NOT ENFORCED mode. Here is the reason:

>SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode. It is up to the user to ensure that the query enforces key integrity.

For DDL to create JDBC table, you can reference [2]

[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

Best,
Danny Chan
在 2020年6月30日 +0800 AM10:25,[hidden email] <[hidden email]>,写道:
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  


 
Date: 2020-06-30 10:08
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM [hidden email] <[hidden email]> wrote:

CREATE TABLE my_table (
id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;


What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 


Thanks,

Lei





--
Best, Jingsong Lee


--
Best, Jingsong Lee