CREATE TABLE my_table ( INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; What will happen after i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei |
Hi Lei, INSERT INTO jdbc_table SELECT * FROM changelog_table; For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records. And then, jdbc sink will: - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages. - delete to deal with "delete" messages. So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog) Best, Jingsong
Best, Jingsong Lee |
HI Lei,
Jingsong is wright, you need define a primary key for your sink table. BTW, Flink use `PRIMARY KEY NOT ENFORCED` to define primary key because Flink doesn’t own data and only supports `NOT ENFORCED` mode, it’s a little bit different with the primary key in DB which is default `ENFORCED` , both `ENFORCED ` and `NOT ENFORCED` are supported in SQL standard. You can look up[1][2] for more details. Best, Leonard [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
|
In reply to this post by Jingsong Li
Thanks Jingsong, Is there any document or example to this? I will build the flink-1.11 package and have a try. Thanks,
Lei
|
Hi, Welcome to try 1.11. There is no direct doc to describe this, but I think these docs can help you [1][2] Best, Jingsong
Best, Jingsong Lee |
In reply to this post by wanglei2@geekplus.com.cn
CREATE TABLE t_pick_order ( order_no VARCHAR, status INT ) WITH ( 'connector' = 'kafka', 'topic' = 'example', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '172.19.78.32:9092', 'format' = 'canal-json' ) CREATE TABLE order_status ( order_no VARCHAR, status INT,PRIMARY KEY (order_no) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx:3306/flink_test', 'table-name' = 'order_status', 'username' = 'dev', 'password' = 'xxxx' ) But when i execute insert INTO order_status SELECT order_no, status FROM t_pick_order There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
|
Best, Jingsong Lee |
I have created an issue [1] and a pull request to fix this. Hope we can catch up with this release. Best, Jark On Wed, 1 Jul 2020 at 18:16, Jingsong Li <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |