Re: FlinkSQL kafka->dedup->kafka
Posted by
Leonard Xu on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkSQL-kafka-dedup-kafka-tp39335p39817.html
Hi, Laurent
I'm not sure that would do what I want though. As far as I understand, the deduplication query will always remember any values it has seen. So if I have, for a specific primary key, the following values in another field: "a", "a", "b", "b", "a", "a", the deduplication query could provide me with "a", "b" as a result. But never with "a", "b", "a" (possibility to come back to a previous value), which is what I need.
From my understanding, your case is not a pure deduplication case but want to both keep the previous record and current record, thus the deduplication query can not satisfy your requirement.
Moreover, I tried putting procttime() DESC, and I get the message: The
submitted query is not an append-only query. Only queries producing
exclusively new rows over time are supported at the moment. I do want an append only query.
Keeping last row in Deduplication always produces a changelog stream, because we need to retract the previous last value and sent the new last value. You could use a connector that supports upsert sink like HBase, JDBC or upsert-kafka connector when sink a changelog stream, the kafka connector can only accept append-only stream and thus you got the message.
The LAG function makes complete sense to me here, since it would only compare with the previous record. I just don't understand why it does not get the value of the previous record, whatever offset I give it. Any idea what I might be doing wrong?
The LAG function is used in over window aggregation and should work in your case, but unfortunately look like the LAG function does not implements correctly, I create an issue[1] to fix this.
Best,
Leonard
Hi, Laurent
Basically, I need to deduplicate, but only keeping in the deduplication state the latest value of the changed column to compare with. While here it seems to keep all previous values…
You can use ` ORDER BY proctime() DESC` in the deduplication query, it will keep last row, I think that’s what you want.
BTW, the deduplication has supported event time in 1.12, this will be available soon.
Best,
Leonard
--
Laurent ExsteensData Engineer
(M) +32 (0) 486 20 48 36
EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank" class="">+32 10 75 02 00
♻ Be green, keep it on the screen