Hello, I'm getting an error in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka. The behavior I want is the following: input: | client_number | address | | ------------------- | ----------- | | 1 | addr1 | | 1 | addr1 | | 1 | addr2 | | 1 | addr2 | | 1 | addr1 | | 1 | addr1 | output: | client_number | address | | ------------------- | ----------- | | 1 | addr1 | | 1 | addr2 | | 1 | addr1 | The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only: Unsupported query Table sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) --> Is there a way to create an append only query from this kind of deduplication query (see my code here below)? --> Would that work if I would use, say, a Postgres sink? Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication) P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself. Thanks in advance for your help. Best Regards, Laurent. ----------------------------------- -- Read from customers kafka topic ----------------------------------- CREATE TEMPORARY TABLE customers ( `client_number` INT, `name` VARCHAR(100), `address` VARCHAR(100) ) COMMENT '' WITH ( 'connector' = 'kafka', 'format' = 'csv', 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092', 'topic' = 'customers', 'csv.field-delimiter' = ';', 'scan.startup.mode' = 'earliest-offset' ); ----------------------------------- -- Add metadata ----------------------------------- CREATE TEMPORARY VIEW metadata AS SELECT * , sha256(cast(client_number as STRING)) AS customer_pk , current_timestamp AS load_date , 'Kafka topic: customers' AS record_source FROM customers; ----------------------------------- -- Deduplicate addresses ----------------------------------- CREATE TEMPORARY VIEW dedup_address as SELECT customer_pk , client_number , load_date , address FROM ( SELECT customer_pk , client_number , load_date , record_source , address , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownum FROM metadata ) where rownum = 1; ----------------------------------- -- Send to sat_customers_address kafka topic ----------------------------------- CREATE TEMPORARY TABLE sat_customers_address ( `customer_pk` VARCHAR(64), `client_number` INT, `address` VARCHAR(100) ) COMMENT '' WITH ( 'connector' = 'kafka', 'format' = 'csv', 'properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', 'topic' = 'sat_customers_address' ); INSERT INTO sat_customers_address SELECT customer_pk , client_number , address FROM dedup_address; -- Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
Hi Laurent, This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream. This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updating streams into Kafka compacted topics. Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. ConsumerRecord#timestamp()? If yes, this is also supported in release-1.12 via metadata syntax in DDL [1]: CREATE TABLE kafka_table ( id BIGINT, name STRING, timestamp BIGINT METADATA, -- read timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'test-topic', 'format' = 'avro' ) Best, Jark On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <[hidden email]> wrote:
|
Hi Jark, thanks for your quick reply. I was indeed expecting it. But that triggers the following questions:
Thanks a lot for your help! Best Regards, Laurent. On Wed, 11 Nov 2020 at 03:31, Jark Wu <[hidden email]> wrote:
Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
Hi Laurent, 1. Deduplicate with keeping the first row will generate an append-only stream. But I guess you are expecting to keep the last row which generates an updating stream. An alternative way is you can use the "changelog-json" format in this repo [1], it will convert the updating stream into append records with change flag encoded. 2. Yes. It will replace records with the same key, i.e. upsert statement. 3. I think it will be available in one or two months. There will be a first release candidate soon. You can watch on the dev ML. I'm not sure the plan of Ververica platform, cc [hidden email] Best, Jark On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <[hidden email]> wrote:
|
Hi Jark, thanks again for your quick response! I tried multiple variants of my query by: - specifying only the primary key in the PARTITION BY clause - changing the order to DESC to keep the last row --> I unfortunately always get the same error message. If I try to make a simple select on the result of this query, I also get the following error: The
submitted query is not an append-only query. Only queries producing
exclusively new rows over time are supported at the moment. So whatever change I make, I never get an append-only query --> Is there something I missed? I also tried to write to kafka as changelog-json, but I got the answer: The sink connector for table `vvp`.`default`.`sat_customers_address` could not be created. 'changelog-json' is not a supported sink format. Supported sink formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet']. (maybe because I'm on the Ververica platform?) This also seem to require an extra kafka topic then, so not ideal. I'm starting to wonder if the deduplication query is really what I need. What I need is: - to forward only the records where some columns (ideally configurable) change for a specific primary key. - in realtime (no windowing) - and have as a result an append-only stream. Like this: input: output (this is what should ultimatelly be published to Kafka and later inserted in a RDBMS): | client_number | address | load_date | | client_number | address | load_date | | ------------------- | ----------- | -------------- | | ------------------- | ----------- | -------------- | | 1 | addr1 | ts1 | --> | 1 | addr1 | ts1 | | 1 | addr1 | ts2 | | 1 | addr2 | ts3 | --> | 1 | addr2 | ts3 | | 1 | addr2 | ts4 | | 1 | addr1 | ts5 | --> | 1 | addr1 | ts5 | | 1 | addr1 | ts6 | --> is this deduplication query the right fit therefore? - if yes, how should it be written to generate an append-only stream? - If not, are there other options? (Match Recognize, UDF, ....?) Thanks a lot for your much appreciated help :). Best Regards, Laurent. On Thu, 12 Nov 2020 at 07:26, Jark Wu <[hidden email]> wrote:
-- Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
Hi Laurent, 1. Currently, it's impossible to convert deduplicate with last row into an append-only stream. 2. Yes, I think Ververica platform doesn't support 'changelog-json' format natively. However, regarding your case, I think you can use keep first row on client_number+address key. SELECT * FROM ( SELECT client_number, address, load_date ROW_NUMBER() OVER (PARTITION BY client_number, address ORDER BY proctime() ASC) AS rownum FROM src) WHERE rownum = 1 That means, the duplicate records on the same client_number + address will be ignored, but the new value of address will be emitted as an append-only stream. Hope this helps you. Best, Jark On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <[hidden email]> wrote:
|
I see what was my mistake: I was using a field in my ORDER BY, while it only support proctime() for now. That allows me to create an append only stream, thanks a lot! However, it still does not allow me to do what I need: If I use both my primary key and changing column in PARTITION BY, then it does not allow me to come back to a previous value of my changed column: SELECT client_number , address , proctime() as load_date FROM ( SELECT client_number , address , ROW_NUMBER() OVER (PARTITION BY client_number, address ORDER BY proctime() ASC) AS rownum FROM customers ) where rownum = 1; input: output: | client_number | address | load_date | | client_number | address | load_date | | ------------------- | ----------- | -------------- | | ------------------- | ----------- | -------------- | |
1 | addr1 | ts1 |
--> | 1 | addr1 | ts1 | | 1 | addr1 | ts2 | | 1 | addr2 | ts3 | --> | 1 | addr2 | ts3 | | 1 | addr2 | ts4 | |
1 | addr1 | ts5 |
--> | 1 | addr1 | ts5 | <-- this one does not show --> I cannot change back to a previous value :( | 1 | addr1 | ts6 | If however I only put my primary key in PARTITION BY, then I only get the first value of my changed column: SELECT client_number , address , proctime() as load_date FROM ( SELECT client_number , address , ROW_NUMBER() OVER (PARTITION BY client_number ORDER BY proctime() ASC) AS rownum FROM customers ) where rownum = 1; input: output: | client_number | address | load_date | | client_number | address | load_date | | ------------------- | ----------- | -------------- | | ------------------- | ----------- | -------------- | |
1 | addr1 | ts1 |
--> | 1 | addr1 | ts1 | | 1 | addr1 | ts2 | | 1 | addr2 | ts3 | --> | 1 | addr2 | ts3 | <-- this one does not show :( | 1 | addr2 | ts4 | |
1 | addr1 | ts5 |
--> | 1 | addr1 | ts5 | <-- this one does not show :( | 1 | addr1 | ts6 | Basically, I need to deduplicate, but only keeping in the deduplication state the latest value of the changed column to compare with. While here it seems to keep all previous values... Is there a way to obtain the behavior I need (with this deduplication method or another one)? Thanks in advance. Best Regards, Laurent. On Thu, 12 Nov 2020 at 12:48, Jark Wu <[hidden email]> wrote:
-- Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
I'm now trying with a MATCH_RECOGNIZE: SELECT * FROM customers MATCH_RECOGNIZE ( PARTITION BY client_number ORDER BY proctime() MEASURES LAST(B.client_number) as client_number, LAST(B.address) as address PATTERN (A* B) DEFINE B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1) ) as T; However, I get the following error: SQL validation failed. Index 0 out of bounds for length 0 I've read the documentation and tried different formulations, but I don't manage to make this work. However, it should be possible to express what I need since the examples of the documentation allos for way more complex patterns. What am I doing wrong? I still prefer sth simpler such as the deduplication query (if it could work as I need it), as it would be a way harder sell to propose FlinkSQL to lower the expertise required for our jobs, if a "simple" deduplication already needs a complex query involving CEP.... Thanks in advance for your help! Best Regards, Laurent. On Thu, 12 Nov 2020 at 17:22, Laurent Exsteens <[hidden email]> wrote:
-- Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
Hello, seems like LAG would probably be the right function to use. However, I get unexpected results from it: ----------------------------------- -- Deduplicate addresses ----------------------------------- --CREATE TEMPORARY VIEW dedup_address as SELECT * FROM ( SELECT client_number , address , LAG(address) OVER (PARTITION BY client_number ORDER BY proctime() ASC) AS previous_address FROM customers ); gives me the same value for LAG(address) than for address:
What is even more strange is that I get the same result no matter what offset I ask:
What am I doing wrong here? Thanks in advance for your help. Regards, Laurent. On Thu, 12 Nov 2020 at 21:56, Laurent Exsteens <[hidden email]> wrote:
-- Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
In reply to this post by Laurent Exsteens
Hi, Laurent
You can use ` ORDER BY proctime() DESC` in the deduplication query, it will keep last row, I think that’s what you want. BTW, the deduplication has supported event time in 1.12, this will be available soon. Best, Leonard |
Hi Leonard, thank you for your answer. I'm not sure that would do what I want though. As far as I understand, the deduplication query will always remember any values it has seen. So if I have, for a specific primary key, the following values in another field: "a", "a", "b", "b", "a", "a", the deduplication query could provide me with "a", "b" as a result. But never with "a", "b", "a" (possibility to come back to a previous value), which is what I need. Moreover, I tried putting procttime() DESC, and I get the message: The
submitted query is not an append-only query. Only queries producing
exclusively new rows over time are supported at the moment. I do want an append only query. The LAG function makes complete sense to me here, since it would only compare with the previous record. I just don't understand why it does not get the value of the previous record, whatever offset I give it. Any idea what I might be doing wrong? Thanks in advance. Regards, Laurent. On Fri, 27 Nov 2020 at 03:28, Leonard Xu <[hidden email]> wrote:
-- Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
Hi, Laurent
From my understanding, your case is not a pure deduplication case but want to both keep the previous record and current record, thus the deduplication query can not satisfy your requirement.
Keeping last row in Deduplication always produces a changelog stream, because we need to retract the previous last value and sent the new last value. You could use a connector that supports upsert sink like HBase, JDBC or upsert-kafka connector when sink a changelog stream, the kafka connector can only accept append-only stream and thus you got the message.
The LAG function is used in over window aggregation and should work in your case, but unfortunately look like the LAG function does not implements correctly, I create an issue[1] to fix this. Best, Leonard
|
In reply to this post by Jark Wu-3
Hi Laurent, With respect to Ververica Platform, we will support Flink 1.12 and add "upsert-kafka" as a packaged connector in our next minor release which we target for February. Cheers, Konstantin On Thu, Nov 12, 2020 at 3:43 AM Jark Wu <[hidden email]> wrote:
|
In reply to this post by Leonard Xu
Hi Leonard,
Indeed, that's what I came to realise during our discussion on this email chain. I'm sorry if it caused confusion. I'm still not sure how to express this requirement in a concise way: "the need to deduplicate but let previous values come back after a different value has appeared"....
That's what I understood indeed. But in my case I really do want to insert and not upsert. Just for information: the goal is to be able to historize kafka messages in real-time. Each message could potentially be splitted to store information in multiple tables (in my example: name and address would be inserted in 2 different tables), and the history should be kept and enriched with the ingestion date. The fact that the kafka message can be split to be stored in multiple tables creates that "deduplication" requirement (in my example the address could have changed but not the name, and we don't want to add a record with no business value in the table containing the names). And of course, a field can be changed twice and as a result have the same value again, and that's business information we do want to keep.
Thanks a lot! I'll follow the issue. I would love to try to fix it... but quickly looking at that code, I'm not sure it's the best way to start contributing. I don't understand what should be changed in that code, let alone find what generated that code and how it should be fixed... In the meantime, I guess the only other option would be the MATCH_RECOGNIZE? Do you think you help me find what I did wrong in this query: SELECT * FROM customers MATCH_RECOGNIZE ( PARTITION BY client_number ORDER BY proctime() MEASURES B.client_number as client_number, B.address as address PATTERN (A* B) DEFINE B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1) ) as T; I get the following error: SQL validation failed. Index 0 out of bounds for length 0 Thanks a lot for your help! Laurent.
Laurent Exsteens Data Engineer (M) +32 (0) 486 20 48 36 EURA NOVA Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00 ♻ Be green, keep it on the screen |
HI Laurent, Did you manage to find the error in your MATCH_RECOGNIZE statement? If I had to take a guess, I'd say it's because you are accessing A, but due to the quantifier of * there might actually be no event A. Cheers, Konstantin On Fri, Nov 27, 2020 at 10:03 PM Laurent Exsteens <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |