Re: FlinkSQL kafka->dedup->kafka

Posted by Jark Wu-3 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkSQL-kafka-dedup-kafka-tp39335p39392.html

Hi Laurent,

1. Deduplicate with keeping the first row will generate an append-only stream. But I guess you are expecting to keep the last row which generates an updating stream. An alternative way is you can
 use the "changelog-json" format in this repo [1], it will convert the updating stream into append 
records with change flag encoded. 
2. Yes. It will replace records with the same key, i.e. upsert statement. 
3. I think it will be available in one or two months. There will be a first release candidate soon.
    You can watch on the dev ML. I'm not sure the plan of Ververica platform, cc [hidden email] 

Best,
Jark

[1]: https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <[hidden email]> wrote:
Hi Jark,

thanks for your quick reply. I was indeed expecting it.

But that triggers the following questions:
  1. Is there another way to do this deduplication and generate an append-only stream? Match Recognize? UDF? ...?
  2. If I would put Postgres as a sink, what would happen? Will the events happen or will they replace the record with the same key?
  3. When will release-1.12 be available? And when would it be integrated in the Ververica platform?
Thanks a lot for your help!

Best Regards,

Laurent.



On Wed, 11 Nov 2020 at 03:31, Jark Wu <[hidden email]> wrote:
Hi Laurent,

This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream.
This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updating
 streams into Kafka compacted topics. 

Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. ConsumerRecord#timestamp()? 
If yes, this is also supported in release-1.12 via metadata syntax in DDL [1]:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp BIGINT METADATA,  -- read timestamp
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'format' = 'avro'
)


Best,
Jark


On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <[hidden email]> wrote:
Hello,

I'm getting an error  in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka.

The behavior I want is the following:

input:
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr2     |
| 1                      | addr1     |
| 1                      | addr1     |

output:
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr1     |

The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only:

Unsupported query
Table sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


--> Is there a way to create an append only query from this kind of deduplication query (see my code here below)?
--> Would that work if I would use, say, a Postgres sink?

Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication)

P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself.

Thanks in advance for your help.

Best Regards,

Laurent.

-----------------------------------
-- Read from customers kafka topic
-----------------------------------
CREATE TEMPORARY TABLE customers (
`client_number` INT,
`name` VARCHAR(100),
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'customers',
'csv.field-delimiter' = ';',
'scan.startup.mode' = 'earliest-offset'
);



-----------------------------------
-- Add metadata
-----------------------------------
CREATE TEMPORARY VIEW metadata AS
SELECT *
, sha256(cast(client_number as STRING)) AS customer_pk
, current_timestamp AS load_date
, 'Kafka topic: customers' AS record_source
FROM customers;



-----------------------------------
-- Deduplicate addresses
-----------------------------------
CREATE TEMPORARY VIEW dedup_address as
SELECT customer_pk
, client_number
, load_date
, address
FROM (
SELECT customer_pk
, client_number
, load_date
, record_source
, address
, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownum
FROM metadata
) where rownum = 1;






-----------------------------------
-- Send to sat_customers_address kafka topic
-----------------------------------
CREATE TEMPORARY TABLE sat_customers_address (
`customer_pk` VARCHAR(64),
`client_number` INT,
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'sat_customers_address'
);

INSERT INTO sat_customers_address
SELECT customer_pk
, client_number
, address
FROM dedup_address;




--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen


--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen