Hi All, I am trying to read data from kinesis stream and applying SQL transformation (distinct) and then tryting to write to CSV sink which is failinf due to this issue (org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.) , full code is here (https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112). can anyone help me moveforward on this issue? Full Code:- // set up the streaming execution environment -- Thanks & Regards Sri Tummala |
Hi Kali, Currently Flink treats all aggregate functions as retractable. As `distinct` is an aggregate function, it's considered by the planner that it might update or retract records (although from my perspective it won't...). Because csv table sink is an append only sink (it's hard to update what has been written in the middle of a file), the exception you mentioned occurs. However, you can use `toAppendStream` method to change the retractable stream to an append only stream. For example, `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get an append only stream. You can then add csv sink to this stream. sri hari kali charan Tummala <[hidden email]> 于2019年7月16日周二 上午3:32写道:
|
Hi Weng, another issue now (Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.), here is the full code https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e7460d1/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L128 and pom https://github.com/kali786516/FlinkStreamAndSql/blob/master/pom.xml. Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams. at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:100) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:126) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala) tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") On Mon, Jul 15, 2019 at 9:52 PM Caizhi Weng <[hidden email]> wrote:
Thanks & Regards
Sri Tummala Screen Shot 2019-07-15 at 10.00.38 PM.png (105K) Download Attachment |
In reply to this post by Caizhi Weng
Hi caizhi and kali: I think this table should use toRetractStream instead of toAppendStream, and you should handle the retract messages. (If you just use distinct, the message should always be accumulate message) Best, JingsongLee
|
Hi Lee, I did try Option 1:- it writes to CSV file only if I kill the running job. tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) OutPut:- 2> (true,180094108369013,John,Holland,c1ad7a1b73172ef67bd24820438f3f93,2019-07-15 22:48:40,travel,Satterfield-Lowe,81,39.015861,-119.883595) Option 2:- I tried several options thought this workaround is kind of working but I need to replace brakcets,true etc.... import java.io.PrintStream On Mon, Jul 15, 2019 at 10:03 PM JingsongLee <[hidden email]> wrote:
Thanks & Regards
Sri Tummala |
In reply to this post by JingsongLee
Hi Lee, it writes only after the job is killed and also I dont see all the records ? is there a workaround? tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) On Mon, Jul 15, 2019 at 10:03 PM JingsongLee <[hidden email]> wrote:
Thanks & Regards
Sri Tummala |
Free forum by Nabble | Edit this page |