FlinkSQL kafka->dedup->kafka

Posted by Laurent Exsteens on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkSQL-kafka-dedup-kafka-tp39335.html

Hello,

I'm getting an error  in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka.

The behavior I want is the following:

input:
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr2     |
| 1                      | addr1     |
| 1                      | addr1     |

output:
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr1     |

The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only:

Unsupported query
Table sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


--> Is there a way to create an append only query from this kind of deduplication query (see my code here below)?
--> Would that work if I would use, say, a Postgres sink?

Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication)

P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself.

Thanks in advance for your help.

Best Regards,

Laurent.

-----------------------------------
-- Read from customers kafka topic
-----------------------------------
CREATE TEMPORARY TABLE customers (
`client_number` INT,
`name` VARCHAR(100),
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'customers',
'csv.field-delimiter' = ';',
'scan.startup.mode' = 'earliest-offset'
);



-----------------------------------
-- Add metadata
-----------------------------------
CREATE TEMPORARY VIEW metadata AS
SELECT *
, sha256(cast(client_number as STRING)) AS customer_pk
, current_timestamp AS load_date
, 'Kafka topic: customers' AS record_source
FROM customers;



-----------------------------------
-- Deduplicate addresses
-----------------------------------
CREATE TEMPORARY VIEW dedup_address as
SELECT customer_pk
, client_number
, load_date
, address
FROM (
SELECT customer_pk
, client_number
, load_date
, record_source
, address
, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownum
FROM metadata
) where rownum = 1;






-----------------------------------
-- Send to sat_customers_address kafka topic
-----------------------------------
CREATE TEMPORARY TABLE sat_customers_address (
`customer_pk` VARCHAR(64),
`client_number` INT,
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'sat_customers_address'
);

INSERT INTO sat_customers_address
SELECT customer_pk
, client_number
, address
FROM dedup_address;




--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen