Hi,
I use flink 1.9.1, sql as follows,
----------------------------------------
INSERT INTO a
SELECT c1, c2, c3, c4
FROM (
SELECT *,ROW_NUMBER() OVER (PARTITION BY c1, c2, c3 ORDER BY c4 DESC) AS rownum" +
FROM t)
WHERE rownum <= 1
----------------------------------------
This sql,returns RetractStream, but flink JDBCUpsertTableSink is UpsertStreamTableSink, so this sql throw Exception as follows:
----------------------------------------
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
----------------------------------------
My problem is how to insert to mysql.