I wrote a simple program reading from kafka using sql and sink to kafka. But only 'update-mode' = 'append' is supported for sink table and the query sql must have no group statement. Only append mode is supported for kafka sink? Thanks, Lei |
Hi Lei, Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. For now, you can customize a KafkaTableSink with implementing UpsertStreamTableSink interface, where you will get a Tuple2<Boolean, Row> records, and the Boolean represents insert or delete operation. Then you can encode the insert/delete operation into Kafka storage or just ignore the operations. Best, Jark
|
Hi Jark, Thanks for the explanation. The group by statement will result a not append stream. I have just tried a join statement and want to send the result to kafka, it also has the error: AppendStreamTableSink requires that Table has only insert changes Why the join result is not appendable. It confused me. Thanks, Lei
|
Hi Lei, Are you trying a regular left join query? Non-time-based operators (e.g. regular join in your case) will emit result when input is not complete, the result will be updated when more inputs come in (by emitting upsert/retract messages). But time-based operators (e.g. windowed aggregate, interval join) can emit final result because it will wait input to be complete (by watermark), thus it can produce append-only stream. Maybe you can try to update your query to interval join [1] (called time-windowed join in docs currently) to get an append result. Best, Jark
|
Free forum by Nabble | Edit this page |