flink requires table key when insert into upsert table sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink requires table key when insert into upsert table sink

徐涛
Hi All,
I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify a table keys in this scenario? I also check the exception stack, and found that the system infer the keys field by 
val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the function return value ?
Thanks a lot !!!
    var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("praiseAggr", praise)

var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("commentAggr", comment)

var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("readerAggr", reader)

var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " + " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")



Thank,
Henry Xu
Reply | Threaded
Open this post in threaded view
|

Re: flink requires table key when insert into upsert table sink

Fabian Hueske-2
Hi Henry,

The problem is that the table that results from the query does not have a unique key.
You can only use an upsert sink if the table has a (composite) unique key. Since this is not the case, you cannot use upsert sink.
However, you can implement a StreamRetractionTableSink which allows to write any kind of Table (append-only/update, keyed/non-keyed) to an external system.

Best, Fabian

2018-08-10 17:06 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify a table keys in this scenario? I also check the exception stack, and found that the system infer the keys field by 
val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the function return value ?
Thanks a lot !!!
    var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("praiseAggr", praise)

var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("commentAggr", comment)

var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("readerAggr", reader)

var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " + " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")



Thank,
Henry Xu

Reply | Threaded
Open this post in threaded view
|

Re: flink requires table key when insert into upsert table sink

徐涛
Hi Fabian,
Could you give an example that the query has a unique key?
What is the mechanism flink infer which field is the unique key(s)?
        Thanks a lot!

Best, Henry

在 2018年8月11日,上午5:21,Fabian Hueske <[hidden email]> 写道:

Hi Henry,

The problem is that the table that results from the query does not have a unique key.
You can only use an upsert sink if the table has a (composite) unique key. Since this is not the case, you cannot use upsert sink.
However, you can implement a StreamRetractionTableSink which allows to write any kind of Table (append-only/update, keyed/non-keyed) to an external system.

Best, Fabian

2018-08-10 17:06 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify a table keys in this scenario? I also check the exception stack, and found that the system infer the keys field by 
val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the function return value ?
Thanks a lot !!!
    var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("praiseAggr", praise)

var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("commentAggr", comment)

var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("readerAggr", reader)

var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " + " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")



Thank,
Henry Xu


Reply | Threaded
Open this post in threaded view
|

Re: flink requires table key when insert into upsert table sink

Hequn Cheng
Hi, 

Could you give an example that the query has a unique key?

Consider the following sql:

SELECT a, SUM(b) as d
FROM Orders
GROUP BY a
 
The result table contains unique key of a.  A document about Streaming Concepts[1] may be helpful for you.

What is the mechanism flink infer which field is the unique key(s)?

Currently(flink-1.6.0), flink sql generate unique keys only from group by and the unique keys info can be passed to the downstream operators, for example the SELECT.

Implement a RetractStreamTableSink

Since outer joins output update data without unique keys, you can use a RetractTableSink to output data. There are some documents about implement a table sink[2].

Best, Hequn



On Sat, Aug 11, 2018 at 6:02 AM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Could you give an example that the query has a unique key?
What is the mechanism flink infer which field is the unique key(s)?
        Thanks a lot!

Best, Henry


在 2018年8月11日,上午5:21,Fabian Hueske <[hidden email]> 写道:

Hi Henry,

The problem is that the table that results from the query does not have a unique key.
You can only use an upsert sink if the table has a (composite) unique key. Since this is not the case, you cannot use upsert sink.
However, you can implement a StreamRetractionTableSink which allows to write any kind of Table (append-only/update, keyed/non-keyed) to an external system.

Best, Fabian

2018-08-10 17:06 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify a table keys in this scenario? I also check the exception stack, and found that the system infer the keys field by 
val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the function return value ?
Thanks a lot !!!
    var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("praiseAggr", praise)

var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("commentAggr", comment)

var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
tableEnv.registerTable("readerAggr", reader)

var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " + " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")



Thank,
Henry Xu