Hi Jark,thanks for your quick reply. I was indeed expecting it.But that triggers the following questions:
- Is there another way to do this deduplication and generate an append-only stream? Match Recognize? UDF? ...?
- If I would put Postgres as a sink, what would happen? Will the events happen or will they replace the record with the same key?
- When will release-1.12 be available? And when would it be integrated in the Ververica platform?
Thanks a lot for your help!Best Regards,Laurent.On Wed, 11 Nov 2020 at 03:31, Jark Wu <[hidden email]> wrote:Hi Laurent,This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream.This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updatingstreams into Kafka compacted topics.Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. ConsumerRecord#timestamp()?If yes, this is also supported in release-1.12 via metadata syntax in DDL [1]:CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp BIGINT METADATA, -- read timestamp
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'format' = 'avro'
)Best,JarkOn Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <[hidden email]> wrote:Hello,I'm getting an error in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka.The behavior I want is the following:input:| client_number | address || ------------------- | ----------- || 1 | addr1 || 1 | addr1 || 1 | addr2 || 1 | addr2 || 1 | addr1 || 1 | addr1 |output:| client_number | address || ------------------- | ----------- || 1 | addr1 || 1 | addr2 || 1 | addr1 |The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only:Unsupported queryTable sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])--> Is there a way to create an append only query from this kind of deduplication query (see my code here below)?--> Would that work if I would use, say, a Postgres sink?Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication)P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself.Thanks in advance for your help.Best Regards,Laurent.------------------------------------- Read from customers kafka topic-----------------------------------CREATE TEMPORARY TABLE customers (`client_number` INT,`name` VARCHAR(100),`address` VARCHAR(100))COMMENT ''WITH ('connector' = 'kafka','format' = 'csv','properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092','topic' = 'customers','csv.field-delimiter' = ';','scan.startup.mode' = 'earliest-offset');------------------------------------- Add metadata-----------------------------------CREATE TEMPORARY VIEW metadata ASSELECT *, sha256(cast(client_number as STRING)) AS customer_pk, current_timestamp AS load_date, 'Kafka topic: customers' AS record_sourceFROM customers;------------------------------------- Deduplicate addresses-----------------------------------CREATE TEMPORARY VIEW dedup_address asSELECT customer_pk, client_number, load_date, addressFROM (SELECT customer_pk, client_number, load_date, record_source, address, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownumFROM metadata) where rownum = 1;------------------------------------- Send to sat_customers_address kafka topic-----------------------------------CREATE TEMPORARY TABLE sat_customers_address (`customer_pk` VARCHAR(64),`client_number` INT,`address` VARCHAR(100))COMMENT ''WITH ('connector' = 'kafka','format' = 'csv','properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092','topic' = 'sat_customers_address');INSERT INTO sat_customers_addressSELECT customer_pk, client_number, addressFROM dedup_address;
--Laurent ExsteensData Engineer(M) +32 (0) 486 20 48 36EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00
♻ Be green, keep it on the screen--Laurent ExsteensData Engineer(M) +32 (0) 486 20 48 36EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00
♻ Be green, keep it on the screen
Free forum by Nabble | Edit this page |