Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM I think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks, Lei |
Hi, This can be a upsert stream [1] Best, Jingsong Lee
Best, Jingsong Lee |
Thanks Jingsong. When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream. I want to know the exactly java code it is generated and have a look at it. Thanks, Lei
|
Seems it is here: https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc There's no JDBCRetractTableSink, only append and upsert. I am confused why the MySQL record can be deleted. Thanks, Lei
|
In reply to this post by Jingsong Li
Hi, This can be a upsert stream [1], and JDBC has upsert sink now [2]. Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <[hidden email]> wrote:
Best, Jingsong Lee |
Hi, Maybe you have some misunderstanding to upsert sink. You can take a look to [1], it can deal with "delete" records. Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <[hidden email]> wrote:
Best, Jingsong Lee |
Thanks Jingsong. So JDBCTableSink now suport append and upsert mode. Retract mode not available yet. It is right? Thanks, Lei
|
It is not the mean what you said. There are two queries: append query and update query. For update query, there are two ways to handle, one is retract, another is upsert. So the thing is a sink can choose a mode to handle update query. Just choose one is OK. You could read more in [1]. Best, Jingsong Lee
Best, Jingsong Lee |
Free forum by Nabble | Edit this page |