http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkSQL-kafka-dedup-kafka-tp39335.html
Hello,
I'm getting an error in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka.
The behavior I want is the following:
input:
| client_number | address |
| ------------------- | ----------- |
| 1 | addr1 |
| 1 | addr1 |
| 1 | addr2 |
| 1 | addr2 |
| 1 | addr1 |
| 1 | addr1 |
output:
| client_number | address |
| ------------------- | ----------- |
| 1 | addr1 || 1 | addr2 || 1 | addr1 |
The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only:
Unsupported query
Table sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
--> Is there a way to create an append only query from this kind of deduplication query (see my code here below)?
--> Would that work if I would use, say, a Postgres sink?
Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication)
P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself.
Thanks in advance for your help.
Best Regards,
Laurent.
-----------------------------------
-- Read from customers kafka topic
-----------------------------------
CREATE TEMPORARY TABLE customers (
`client_number` INT,
`name` VARCHAR(100),
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
'topic' = 'customers',
'csv.field-delimiter' = ';',
'scan.startup.mode' = 'earliest-offset'
);
-----------------------------------
-- Add metadata
-----------------------------------
CREATE TEMPORARY VIEW metadata AS
SELECT *
, sha256(cast(client_number as STRING)) AS customer_pk
, current_timestamp AS load_date
, 'Kafka topic: customers' AS record_source
FROM customers;
-----------------------------------
-- Deduplicate addresses
-----------------------------------
CREATE TEMPORARY VIEW dedup_address as
SELECT customer_pk
, client_number
, load_date
, address
FROM (
SELECT customer_pk
, client_number
, load_date
, record_source
, address
, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownum
FROM metadata
) where rownum = 1;
-----------------------------------
-- Send to sat_customers_address kafka topic
-----------------------------------
CREATE TEMPORARY TABLE sat_customers_address (
`customer_pk` VARCHAR(64),
`client_number` INT,
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
'topic' = 'sat_customers_address'
);
INSERT INTO sat_customers_address
SELECT customer_pk
, client_number
, address
FROM dedup_address;
--
Laurent ExsteensData Engineer
(M) +32 (0) 486 20 48 36
EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00
euranova.eu
research.euranova.eu