From my understanding, your case is not a pure deduplication case but want to both keep the previous record and current record, thus the deduplication query can not satisfy your requirement.
Keeping last row in Deduplication always produces a changelog stream, because we need to retract the previous last value and sent the new last value. You could use a connector that supports upsert sink like HBase, JDBC or upsert-kafka connector when sink a changelog stream, the kafka connector can only accept append-only stream and thus you got the message.
The LAG function is used in over window aggregation and should work in your case, but unfortunately look like the LAG function does not implements correctly, I create an issue[1] to fix this.
Best,LeonardOn Fri, 27 Nov 2020 at 03:28, Leonard Xu <[hidden email]> wrote:Hi, LaurentBasically, I need to deduplicate, but only keeping in the deduplication state the latest value of the changed column to compare with. While here it seems to keep all previous values…You can use ` ORDER BY proctime() DESC` in the deduplication query, it will keep last row, I think that’s what you want.BTW, the deduplication has supported event time in 1.12, this will be available soon.Best,Leonard
--Laurent ExsteensData Engineer(M) +32 (0) 486 20 48 36EURA NOVARue Emile Francqui, 41435 Mont-Saint-Guibert(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00
♻ Be green, keep it on the screen
EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00
Free forum by Nabble | Edit this page |