Hi Laurent,1. Currently, it's impossible to convert deduplicate with last row into an append-only stream.2. Yes, I think Ververica platform doesn't support 'changelog-json' format natively.However, regarding your case, I think you can use keep first row on client_number+address key.SELECT *
FROM (
SELECT client_number, address, load_date
ROW_NUMBER() OVER(PARTITION BY client_number, address ORDER BY proctime() ASC) AS rownum
FROM src)
WHERE rownum = 1That means, the duplicate records on the same client_number + address will be ignored,but the new value of address will be emitted as an append-only stream.Hope this helps you.Best,JarkOn Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <[hidden email]> wrote:Hi Jark,thanks again for your quick response!I tried multiple variants of my query by:- specifying only the primary key in the PARTITION BY clause- changing the order to DESC to keep the last row--> I unfortunately always get the same error message.If I try to make a simple select on the result of this query, I also get the following error: The submitted query is not an append-only query. Only queries producing exclusively new rows over time are supported at the moment. So whatever change I make, I never get an append-only query --> Is there something I missed?I also tried to write to kafka as changelog-json, but I got the answer: The sink connector for table `vvp`.`default`.`sat_customers_address` could not be created. 'changelog-json' is not a supported sink format. Supported sink formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet']. (maybe because I'm on the Ververica platform?)This also seem to require an extra kafka topic then, so not ideal.I'm starting to wonder if the deduplication query is really what I need.What I need is:- to forward only the records where some columns (ideally configurable) change for a specific primary key.- in realtime (no windowing)- and have as a result an append-only stream.Like this:input: output (this is what should ultimatelly be published to Kafka and later inserted in a RDBMS):| client_number | address | load_date | | client_number | address | load_date || ------------------- | ----------- | -------------- | | ------------------- | ----------- | -------------- || 1 | addr1 | ts1 | --> | 1 | addr1 | ts1 || 1 | addr1 | ts2 || 1 | addr2 | ts3 | --> | 1 | addr2 | ts3 || 1 | addr2 | ts4 || 1 | addr1 | ts5 | --> | 1 | addr1 | ts5 || 1 | addr1 | ts6 |--> is this deduplication query the right fit therefore?- if yes, how should it be written to generate an append-only stream?- If not, are there other options? (Match Recognize, UDF, ....?)Thanks a lot for your much appreciated help :).Best Regards,Laurent.On Thu, 12 Nov 2020 at 07:26, Jark Wu <[hidden email]> wrote:Hi Laurent,> What I want is a record to be forwarded only if some of the columns changeIIUC, what you want is still deduplication with the last row.Keeping first row will drop all the duplicate rows on the same primary key.Keeping last row will emit updates when the duplicate rows on the same primary key, that means column value changes will notify downstream operators.The difference of keeping first row and last row is specified by the direction of ORDER BY clause [1].Best,
Jark[1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplicationOn Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <[hidden email]> wrote:Thanks.I actually want the first row. What I want is a record to be forwarded only if some of the columns change (of course keyed by the primary key). I used rownum = 1, is that not selecting the first row?How can I adapt my query to let only the row effectively changing the values pass, as an append only stream?If not possible, I'll look at converting it after. But I prefer a solution in the deduplication query.The goal is to show my customer that what they want to achieve is very straightforward in flink SQL, so the simpler the queries the better. I need to present my conclusions tomorrow.Thanks a lot already for your help!Best regards,Laurent.On Thu, Nov 12, 2020, 03:43 Jark Wu <[hidden email]> wrote:Hi Laurent,1. Deduplicate with keeping the first row will generate an append-only stream. But I guess you are expecting to keep the last row which generates an updating stream. An alternative way is you canuse the "changelog-json" format in this repo [1], it will convert the updating stream into appendrecords with change flag encoded.2. Yes. It will replace records with the same key, i.e. upsert statement.3. I think it will be available in one or two months. There will be a first release candidate soon.You can watch on the dev ML. I'm not sure the plan of Ververica platform, cc [hidden email]Best,JarkOn Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <[hidden email]> wrote:Hi Jark,thanks for your quick reply. I was indeed expecting it.But that triggers the following questions:
- Is there another way to do this deduplication and generate an append-only stream? Match Recognize? UDF? ...?
- If I would put Postgres as a sink, what would happen? Will the events happen or will they replace the record with the same key?
- When will release-1.12 be available? And when would it be integrated in the Ververica platform?
Thanks a lot for your help!Best Regards,Laurent.On Wed, 11 Nov 2020 at 03:31, Jark Wu <[hidden email]> wrote:Hi Laurent,This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream.This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updatingstreams into Kafka compacted topics.Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. ConsumerRecord#timestamp()?If yes, this is also supported in release-1.12 via metadata syntax in DDL [1]:CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp BIGINT METADATA, -- read timestamp
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'format' = 'avro'
)Best,JarkOn Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <[hidden email]> wrote:Hello,I'm getting an error in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka.The behavior I want is the following:input:| client_number | address || ------------------- | ----------- || 1 | addr1 || 1 | addr1 || 1 | addr2 || 1 | addr2 || 1 | addr1 || 1 | addr1 |output:| client_number | address || ------------------- | ----------- || 1 | addr1 || 1 | addr2 || 1 | addr1 |The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only:Unsupported queryTable sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])--> Is there a way to create an append only query from this kind of deduplication query (see my code here below)?--> Would that work if I would use, say, a Postgres sink?Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication)P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself.Thanks in advance for your help.Best Regards,Laurent.------------------------------------- Read from customers kafka topic-----------------------------------CREATE TEMPORARY TABLE customers (`client_number` INT,`name` VARCHAR(100),`address` VARCHAR(100))COMMENT ''WITH ('connector' = 'kafka','format' = 'csv','properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092','topic' = 'customers','csv.field-delimiter' = ';','scan.startup.mode' = 'earliest-offset');------------------------------------- Add metadata-----------------------------------CREATE TEMPORARY VIEW metadata ASSELECT *, sha256(cast(client_number as STRING)) AS customer_pk, current_timestamp AS load_date, 'Kafka topic: customers' AS record_sourceFROM customers;------------------------------------- Deduplicate addresses-----------------------------------CREATE TEMPORARY VIEW dedup_address asSELECT customer_pk, client_number, load_date, addressFROM (SELECT customer_pk, client_number, load_date, record_source, address, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownumFROM metadata) where rownum = 1;------------------------------------- Send to sat_customers_address kafka topic-----------------------------------CREATE TEMPORARY TABLE sat_customers_address (`customer_pk` VARCHAR(64),`client_number` INT,`address` VARCHAR(100))COMMENT ''WITH ('connector' = 'kafka','format' = 'csv','properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092','topic' = 'sat_customers_address');INSERT INTO sat_customers_addressSELECT customer_pk, client_number, addressFROM dedup_address;
--Laurent ExsteensData Engineer(M) +32 (0) 486 20 48 36EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" rel="noreferrer" target="_blank">+32 10 75 02 00
♻ Be green, keep it on the screen--Laurent ExsteensData Engineer(M) +32 (0) 486 20 48 36EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" rel="noreferrer" target="_blank">+32 10 75 02 00
♻ Be green, keep it on the screen
♻ Be green, keep it on the screen
--Laurent ExsteensData Engineer(M) +32 (0) 486 20 48 36EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00
♻ Be green, keep it on the screen
EURA NOVA
Rue Emile Francqui, 4
1435 Mont-Saint-Guibert
(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00
Free forum by Nabble | Edit this page |