Kafka sink only support append mode?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka sink only support append mode?

wanglei2@geekplus.com.cn
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei


Reply | Threaded
Open this post in threaded view
|

Re: Kafka sink only support append mode?

Jark Wu-3
Hi Lei,

Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. 
For now, you can customize a KafkaTableSink with implementing UpsertStreamTableSink interface, where you will get a Tuple2<Boolean, Row> records, 
and the Boolean represents insert or delete operation. Then you can encode the insert/delete operation into Kafka storage or just ignore the operations. 

Best,
Jark

On Mon, 9 Mar 2020 at 19:14, [hidden email] <[hidden email]> wrote:
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei


Reply | Threaded
Open this post in threaded view
|

Re: Re: Kafka sink only support append mode?

wanglei2@geekplus.com.cn
Hi Jark, 
Thanks  for the  explanation.
The group by statement will result a not append stream. 
I have just tried a join statement and want to send the result to kafka, it also has the error:
     AppendStreamTableSink requires that Table has only insert changes
Why the join result is not appendable. It confused me.

Thanks,
Lei
 
Date: 2020-03-09 19:25
Subject: Re: Kafka sink only support append mode?
Hi Lei,

Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. 
For now, you can customize a KafkaTableSink with implementing UpsertStreamTableSink interface, where you will get a Tuple2<Boolean, Row> records, 
and the Boolean represents insert or delete operation. Then you can encode the insert/delete operation into Kafka storage or just ignore the operations. 

Best,
Jark

On Mon, 9 Mar 2020 at 19:14, [hidden email] <[hidden email]> wrote:
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei


Reply | Threaded
Open this post in threaded view
|

Re: Re: Kafka sink only support append mode?

Jark Wu-3
Hi Lei,

Are you trying a regular left join query?

Non-time-based operators (e.g. regular join in your case) will emit result when input is not complete, 
the result will be updated when more inputs come in (by emitting upsert/retract messages).
But time-based operators (e.g. windowed aggregate, interval join) can emit final result because it will wait input to be complete (by watermark), 
thus it can produce append-only stream. 

Maybe you can try to update your query to interval join [1] (called time-windowed join in docs currently) to get an append result.

Best,
Jark



On Tue, 10 Mar 2020 at 18:41, [hidden email] <[hidden email]> wrote:
Hi Jark, 
Thanks  for the  explanation.
The group by statement will result a not append stream. 
I have just tried a join statement and want to send the result to kafka, it also has the error:
     AppendStreamTableSink requires that Table has only insert changes
Why the join result is not appendable. It confused me.

Thanks,
Lei
 
Date: 2020-03-09 19:25
Subject: Re: Kafka sink only support append mode?
Hi Lei,

Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. 
For now, you can customize a KafkaTableSink with implementing UpsertStreamTableSink interface, where you will get a Tuple2<Boolean, Row> records, 
and the Boolean represents insert or delete operation. Then you can encode the insert/delete operation into Kafka storage or just ignore the operations. 

Best,
Jark

On Mon, 9 Mar 2020 at 19:14, [hidden email] <[hidden email]> wrote:
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei