Re: FlinkSQL kafka->dedup->kafka

Posted by Laurent Exsteens on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkSQL-kafka-dedup-kafka-tp39335p39434.html

I'm now trying with a MATCH_RECOGNIZE:


SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
LAST(B.client_number) as client_number,
LAST(B.address) as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
) as T;


However, I get the following error:
SQL validation failed. Index 0 out of bounds for length 0

I've read the documentation and tried different formulations, but I don't manage to make this work.
However, it should be possible to express what I need since the examples of the documentation allos for way more complex patterns.
What am I doing wrong?

I still prefer sth simpler such as the deduplication query (if it could work as I need it), as it would be a way harder sell to propose FlinkSQL to lower the expertise required for our jobs, if a "simple" deduplication already needs a complex query involving CEP....

Thanks in advance for your help!

Best Regards,

Laurent.



On Thu, 12 Nov 2020 at 17:22, Laurent Exsteens <[hidden email]> wrote:
I see what was my mistake: I was using a field in my ORDER BY, while it only support proctime() for now.

That allows me to create an append only stream, thanks a lot!

However, it still does not allow me to do what I need:


If I use both my primary key and changing column in PARTITION BY, then it does not allow me to come back to a previous value of my changed column:
SELECT client_number
, address
, proctime() as load_date
FROM (
SELECT client_number
, address
, ROW_NUMBER() OVER (PARTITION BY client_number, address ORDER BY proctime() ASC) AS rownum
FROM customers
) where rownum = 1;
input:                                                                       output:
| client_number | address |  load_date |                   | client_number | address |  load_date |
| ------------------- | ----------- | -------------- |                  | ------------------- | ----------- | -------------- |
| 1                      | addr1     | ts1             |     -->        | 1                      | addr1     | ts1             |
| 1                      | addr1     | ts2             |            
| 1                      | addr2     | ts3             |     -->        | 1                      | addr2     | ts3             |
| 1                      | addr2     | ts4             |                
| 1                      | addr1     | ts5             |     -->        | 1                      | addr1     | ts5             | <-- this one does not show --> I cannot change back to a previous value :(
| 1                      | addr1     | ts6             |                       




If however I only put my primary key in PARTITION BY, then I only get the first value of my changed column:
SELECT client_number
, address
, proctime() as load_date
FROM (
SELECT client_number
, address
, ROW_NUMBER() OVER (PARTITION BY client_number ORDER BY proctime() ASC) AS rownum
FROM customers
) where rownum = 1;
input:                                                                       output:
| client_number | address |  load_date |                   | client_number | address |  load_date |
| ------------------- | ----------- | -------------- |                  | ------------------- | ----------- | -------------- |
| 1                      | addr1     | ts1             |     -->        | 1                      | addr1     | ts1             |
| 1                      | addr1     | ts2             |            
| 1                      | addr2     | ts3             |     -->        | 1                      | addr2     | ts3             | <-- this one does not show :(
| 1                      | addr2     | ts4             |                
| 1                      | addr1     | ts5             |     -->        | 1                      | addr1     | ts5             | <-- this one does not show :(
| 1                      | addr1     | ts6             |       



Basically, I need to deduplicate, but only keeping in the deduplication state the latest value of the changed column to compare with. While here it seems to keep all previous values...

Is there a way to obtain the behavior I need (with this deduplication method or another one)?

Thanks in advance.

Best Regards,

Laurent.


On Thu, 12 Nov 2020 at 12:48, Jark Wu <[hidden email]> wrote:
Hi Laurent,

1. Currently, it's impossible to convert deduplicate with last row into an append-only stream. 
2. Yes, I think Ververica platform doesn't support 'changelog-json' format natively. 

However, regarding your case, I think you can use keep first row on client_number+address key.

SELECT *
FROM (
   SELECT client_number, address, load_date
     ROW_NUMBER() OVER 
       (PARTITION BY client_number, address ORDER BY proctime() ASC) AS rownum
   FROM src)
WHERE rownum = 1


That means, the duplicate records on the same client_number + address will be ignored, 
but the new value of address will be emitted as an append-only stream.

Hope this helps you. 

Best,
Jark


On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <[hidden email]> wrote:
Hi Jark,

thanks again for your quick response!

I tried multiple variants of my query by:
- specifying only the primary key in the PARTITION BY clause
- changing the order to DESC to keep the last row

--> I unfortunately always get the same error message.
If I try to make a simple select on the result of this query, I also get the following error: The submitted query is not an append-only query. Only queries producing exclusively new rows over time are supported at the moment. So whatever change I make, I never get an append-only query --> Is there something I missed?

I also tried to write to kafka as changelog-json, but I got the answer: The sink connector for table `vvp`.`default`.`sat_customers_address` could not be created. 'changelog-json' is not a supported sink format. Supported sink formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet']. (maybe because I'm on the Ververica platform?)
This also seem to require an extra kafka topic then, so not ideal.


I'm starting to wonder if the deduplication query is really what I need.

What I need is:
- to forward only the records where some columns (ideally configurable) change for a specific primary key.
- in realtime (no windowing)
- and have as a result an append-only stream.

Like this:

input:                                                                       output (this is what should ultimatelly be published to Kafka and later inserted in a RDBMS):
| client_number | address |  load_date |                   | client_number | address |  load_date |
| ------------------- | ----------- | -------------- |                  | ------------------- | ----------- | -------------- |
| 1                      | addr1     | ts1             |     -->        | 1                      | addr1     | ts1             |
| 1                      | addr1     | ts2             |            
| 1                      | addr2     | ts3             |     -->        | 1                      | addr2     | ts3             |
| 1                      | addr2     | ts4             |                
| 1                      | addr1     | ts5             |     -->        | 1                      | addr1     | ts5             |
| 1                      | addr1     | ts6             |                       

--> is this deduplication query the right fit therefore?
     - if yes, how should it be written to generate an append-only stream?
     - If not, are there other options? (Match Recognize, UDF, ....?)

Thanks a lot for your much appreciated help :).

Best Regards,

Laurent.


On Thu, 12 Nov 2020 at 07:26, Jark Wu <[hidden email]> wrote:
Hi Laurent,

> What I want is a record to be forwarded only if some of the columns change

IIUC, what you want is still deduplication with the last row. 
Keeping first row will drop all the duplicate rows on the same primary key. 
Keeping last row will emit updates when the duplicate rows on the same primary key, that means column value changes will notify downstream operators. 
The difference of keeping first row and last row is specified by the direction of ORDER BY clause [1]. 

Best,
Jark





On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <[hidden email]> wrote:
Thanks.

I actually want the first row. What I want is a record to be forwarded only if some of the columns change (of course keyed by the primary key). I used rownum = 1, is that not selecting the first row?

How can I adapt my query to let only the row effectively changing the values pass, as an append only stream?

If not possible, I'll look at converting it after. But I prefer a solution in the deduplication query.
The goal is to show my customer that what they want to achieve is very straightforward in flink SQL, so the simpler the queries the better. I need to present my conclusions tomorrow.

Thanks a lot already for your help!

Best regards,

Laurent.


On Thu, Nov 12, 2020, 03:43 Jark Wu <[hidden email]> wrote:
Hi Laurent,

1. Deduplicate with keeping the first row will generate an append-only stream. But I guess you are expecting to keep the last row which generates an updating stream. An alternative way is you can
 use the "changelog-json" format in this repo [1], it will convert the updating stream into append 
records with change flag encoded. 
2. Yes. It will replace records with the same key, i.e. upsert statement. 
3. I think it will be available in one or two months. There will be a first release candidate soon.
    You can watch on the dev ML. I'm not sure the plan of Ververica platform, cc [hidden email] 

Best,
Jark


On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <[hidden email]> wrote:
Hi Jark,

thanks for your quick reply. I was indeed expecting it.

But that triggers the following questions:
  1. Is there another way to do this deduplication and generate an append-only stream? Match Recognize? UDF? ...?
  2. If I would put Postgres as a sink, what would happen? Will the events happen or will they replace the record with the same key?
  3. When will release-1.12 be available? And when would it be integrated in the Ververica platform?
Thanks a lot for your help!

Best Regards,

Laurent.



On Wed, 11 Nov 2020 at 03:31, Jark Wu <[hidden email]> wrote:
Hi Laurent,

This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream.
This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updating
 streams into Kafka compacted topics. 

Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. ConsumerRecord#timestamp()? 
If yes, this is also supported in release-1.12 via metadata syntax in DDL [1]:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp BIGINT METADATA,  -- read timestamp
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'format' = 'avro'
)


Best,
Jark


On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <[hidden email]> wrote:
Hello,

I'm getting an error  in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka.

The behavior I want is the following:

input:
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr2     |
| 1                      | addr1     |
| 1                      | addr1     |

output:
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr1     |

The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only:

Unsupported query
Table sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


--> Is there a way to create an append only query from this kind of deduplication query (see my code here below)?
--> Would that work if I would use, say, a Postgres sink?

Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication)

P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself.

Thanks in advance for your help.

Best Regards,

Laurent.

-----------------------------------
-- Read from customers kafka topic
-----------------------------------
CREATE TEMPORARY TABLE customers (
`client_number` INT,
`name` VARCHAR(100),
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'customers',
'csv.field-delimiter' = ';',
'scan.startup.mode' = 'earliest-offset'
);



-----------------------------------
-- Add metadata
-----------------------------------
CREATE TEMPORARY VIEW metadata AS
SELECT *
, sha256(cast(client_number as STRING)) AS customer_pk
, current_timestamp AS load_date
, 'Kafka topic: customers' AS record_source
FROM customers;



-----------------------------------
-- Deduplicate addresses
-----------------------------------
CREATE TEMPORARY VIEW dedup_address as
SELECT customer_pk
, client_number
, load_date
, address
FROM (
SELECT customer_pk
, client_number
, load_date
, record_source
, address
, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownum
FROM metadata
) where rownum = 1;






-----------------------------------
-- Send to sat_customers_address kafka topic
-----------------------------------
CREATE TEMPORARY TABLE sat_customers_address (
`customer_pk` VARCHAR(64),
`client_number` INT,
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'sat_customers_address'
);

INSERT INTO sat_customers_address
SELECT customer_pk
, client_number
, address
FROM dedup_address;




--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" rel="noreferrer" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen


--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" rel="noreferrer" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen

 Be green, keep it on the screen


--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen


--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu



--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen