Re: FlinkSQL kafka->dedup->kafka

Posted by Laurent Exsteens on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkSQL-kafka-dedup-kafka-tp39335p39816.html

Hi Leonard,

thank you for your answer.

I'm not sure that would do what I want though. As far as I understand, the deduplication query will always remember any values it has seen. So if I have, for a specific primary key, the following values in another field: "a", "a", "b", "b", "a", "a", the deduplication query could provide me with "a", "b" as a result. But never with "a", "b", "a" (possibility to come back to a previous value), which is what I need.
Moreover, I tried putting procttime() DESC, and I get the message: The submitted query is not an append-only query. Only queries producing exclusively new rows over time are supported at the moment. I do want an append only query.

The LAG function makes complete sense to me here, since it would only compare with the previous record. I just don't understand why it does not get the value of the previous record, whatever offset I give it. Any idea what I might be doing wrong?

Thanks in advance.

Regards,

Laurent.

On Fri, 27 Nov 2020 at 03:28, Leonard Xu <[hidden email]> wrote:
Hi, Laurent

Basically, I need to deduplicate, but only keeping in the deduplication state the latest value of the changed column to compare with. While here it seems to keep all previous values…

You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it will keep last row, I think that’s what you want.

BTW, the deduplication has supported event time in 1.12, this will be available soon.

Best,
Leonard



--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen