flink1.9.1 RetractStream insert to mysql problem

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.9.1 RetractStream insert to mysql problem

Polarisary
Hi,
I use flink 1.9.1, sql as follows,
----------------------------------------
INSERT INTO a
SELECT c1, c2, c3, c4
FROM (
SELECT *,ROW_NUMBER() OVER (PARTITION BY c1, c2, c3 ORDER BY c4 DESC) AS rownum" +
FROM t)
WHERE rownum <= 1
----------------------------------------
This sql,returns RetractStream, but flink JDBCUpsertTableSink is UpsertStreamTableSink, so this sql throw Exception as follows:
----------------------------------------
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)

----------------------------------------
My problem is how to insert to mysql.
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9.1 RetractStream insert to mysql problem

Jingsong Li
Hi Polarisary,

Look at the semantics your SQL wants to express: Top N, More practically is: Top 1.
- Top N produce stream with primary keys contains row number, but your sql didn't select row number, so there is not primary key.
- UpsertStreamTableSink requires primary key, So there is an exception.

The solution I suggest is:
- Solution 1: select row number too. So this Top N sql will produce primary key.
- Solution 2: add "select c1, c2, c3, c4 from 'Top N SQL' group by c1, c2, c3, c4" to convert this sql to sql with primary key.
- Solution 3: Optimize blink planner to produce primary key without row number in special Top 1 SQL. I created [1] to improve it.


Best,
Jingsong Lee

On Mon, Jan 20, 2020 at 9:58 AM Polarisary <[hidden email]> wrote:
Hi,
I use flink 1.9.1, sql as follows,
----------------------------------------
INSERT INTO a
SELECT c1, c2, c3, c4
FROM (
SELECT *,ROW_NUMBER() OVER (PARTITION BY c1, c2, c3 ORDER BY c4 DESC) AS rownum" +
FROM t)
WHERE rownum <= 1
----------------------------------------
This sql,returns RetractStream, but flink JDBCUpsertTableSink is UpsertStreamTableSink, so this sql throw Exception as follows:
----------------------------------------
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)

----------------------------------------
My problem is how to insert to mysql.


--
Best, Jingsong Lee