TableException

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

TableException

Pramit Vamsi
Hi,

I am attempting the following:

String sql = "INSERT INTO table3 "
        + "SELECT col1, col2,  window_start_time ,  window_end_time , MAX(col3), MAX(col4), MAX(col5) FROM "
        + "(SELECT col1,col2, "
                + "TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start_time, "
                + "TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end_time, "
  ....
                + "FROM table1"
                + "WHERE...."
                + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), col1, col2"
                + "UNION "
                + "SELECT col1, col2, "
                + "TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start_time, "
                + "TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end_time, "
....
                + "FROM table2"
                + "WHERE ..... "
                + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),  col1, col2  ) "
                + " window_start_time, window_end_time, col1, col2";

tableEnv.sqlUpdate( sql  );

I am using JDBCAppendTableSink. 

Exception:
org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.

What in the query should I fix? 

Reply | Threaded
Open this post in threaded view
|

Re: TableException

JingsongLee
Hi Pramit:
AppendStreamTableSink defines an external TableSink to emit a streaming table with only insert changes. If the table is also modified by update or delete changes, a TableException will be thrown.[1]
Your sql seems have update or delete changes.
You can try to use RetractStreamTableSink or UpsertStreamTableSink. (Unfortunately, we don't have Retract/Upsert JDBC Sink now, you can try to do by yourself)



Best, JingsongLee

------------------------------------------------------------------
From:Pramit Vamsi <[hidden email]>
Send Time:2019年6月13日(星期四) 01:35
To:user <[hidden email]>
Subject:TableException

Hi,

I am attempting the following:

String sql = "INSERT INTO table3 "
        + "SELECT col1, col2,  window_start_time ,  window_end_time , MAX(col3), MAX(col4), MAX(col5) FROM "
        + "(SELECT col1,col2, "
                + "TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start_time, "
                + "TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end_time, "
  ....
                + "FROM table1"
                + "WHERE...."
                + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), col1, col2"
                + "UNION "
                + "SELECT col1, col2, "
                + "TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start_time, "
                + "TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end_time, "
....
                + "FROM table2"
                + "WHERE ..... "
                + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),  col1, col2  ) "
                + " window_start_time, window_end_time, col1, col2";

tableEnv.sqlUpdate( sql  );

I am using JDBCAppendTableSink. 

Exception:
org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.

What in the query should I fix?