FlinkSQL Upsert/Retraction 写入 MySQL 的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL Upsert/Retraction 写入 MySQL 的问题

wanglei2@geekplus.com.cn


INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1

每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 RetractStream 呢?

实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?



如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?




Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Leonard Xu
Hi, wanglei

PLEASE use English when send mails to user([hidden email]) mail list.
You should send to to user-zh([hidden email]) mail list,
and I’m pleasure to answer the question here.

Best,
Leonard Xu

在 2020年4月27日,12:14,[hidden email] 写道:



INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1

每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 RetractStream 呢?

实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?



如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?