Hello,
Flink 1.12.1(pyflink) I am deduplicating CDC records coming from Maxwell in a kafka topic. Here is the SQL: CREATE TABLE stats_topic( As there are a lot of CDC records for a single ID im using ROW_NUMBER() and produce them on a 20 minutes interval to the sink_topic. The problem is that flink doesnt allow me to use it in combination with with the kafka connector: pyflink.util.exceptions.TableException: Table sink 'default_catalog.default_database.sink_table' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2]) If I use the upsert-kafka connector everything is fine but then i receive empty JSON records in the sink topic: {"id": 111111, "account": 4, "upd_ts": 1612334952} Thank you! |
Any help please? Is there a way to use the "Last row" from a deduplication in an append-only stream or tell upsert-kafka to not produce null records in the sink? Thank you On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]> wrote:
|
Hi, AFAIK this should be supported in 1.12 via FLINK-19568 [1] I'm pulling in Timo and Jark who might know better. Regards,
Roman On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]> wrote:
|
Hi,
could the problem be that you are mixing OVER and TUMBLE window with each other? The TUMBLE is correctly defined over time attribute `row_ts` but the OVER window is defined using a regular column `upd_ts`. This might be the case why the query is not append-only but updating. Maybe you can split the problem into sub queries and share the plan with us using .explain()? The nulls in upsert-kafka should be gone once you enable compaction mode in Kafka. I hope this helps. Regards, Timo On 08.02.21 10:53, Khachatryan Roman wrote: > Hi, > > AFAIK this should be supported in 1.12 via FLINK-19568 [1] > I'm pulling in Timo and Jark who might know better. > > https://issues.apache.org/jira/browse/FLINK-19857 > <https://issues.apache.org/jira/browse/FLINK-19857> > > Regards, > Roman > > > On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email] > <mailto:[hidden email]>> wrote: > > Any help please? Is there a way to use the "Last row" from a > deduplication in an append-only stream or tell upsert-kafka to not > produce *null* records in the sink? > > Thank you > > On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email] > <mailto:[hidden email]>> wrote: > > Hello, > Flink 1.12.1(pyflink) > I am deduplicating CDC records coming from Maxwell in a kafka > topic. Here is the SQL: > > CREATE TABLE stats_topic( > `data` ROW<`id` BIGINT, `account` INT, `upd_ts` > BIGINT>, > `ts` BIGINT, > `xid` BIGINT , > row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), > WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL > '15' SECOND > ) WITH ( > 'connector' = 'kafka', > 'format' = 'json', > 'topic' = 'stats_topic', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id > <http://properties.group.id>' = 'test_group' > ) > > CREATE TABLE sink_table( > `id` BIGINT, > `account` INT, > `upd_ts` BIGINT > ) WITH ( > 'connector' = 'kafka', > 'format' = 'json', > 'topic' = 'sink_topic', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id > <http://properties.group.id>' = 'test_group' > ) > > > INSERT INTO sink_table > SELECT > id, > account, > upd_ts > FROM ( > SELECT > id, > account, > upd_ts, > ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) > AS rownum > FROM stats_topic > GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' > MINUTE) > ) > WHERE rownum=1 > > > As there are a lot of CDC records for a single ID im using > ROW_NUMBER() and produce them on a 20 minutes interval to the > sink_topic. The problem is that flink doesnt allow me to use it > in combination with with the kafka connector: > > pyflink.util.exceptions.TableException: Table sink > 'default_catalog.default_database.sink_table' doesn't > support consuming update and delete changes which is > produced by node Rank(strategy=[UndefinedStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], > partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2]) > > > If I use the*upsert-kafka* connector everything is fine but then > i receive empty JSON records in the sink topic: > > {"id": 111111, "account": 4, "upd_ts": 1612334952} > {"id": 222222, "account": 4, "upd_ts": 1612334953} > {} > {"id": 333333, "account": 4, "upd_ts": 1612334955} > {} > {"id": 444444, "account": 4, "upd_ts": 1612334956} > > > Thank you! > |
Thanks for the quick reply, Timo. Ill test with the row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record. Regards On Mon, Feb 8, 2021, 18:56 Timo Walther <[hidden email]> wrote: Hi, |
Unfortunately using row_ts doesn't help. Setting the kafka topic cleanup.policy to compact is not a very good idea as it increases cpu, memory and might lead to other problems. So for now I'll just ignore the null records. Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ? Thanks! On Mon, Feb 8, 2021 at 7:13 PM meneldor <[hidden email]> wrote:
|
Hi, Are you sure that the null records are not actually tombstone records? If you use upsert tables you usually want to have them + compaction. Or how else will you deal with deletions?
What do you want to achieve? CDC records should be deduplicated by definition. I'm assuming that you want to aggregate the state to the current state. If so, how do you decide when the record is complete (e.g. no future updates) and can be written? I have the feeling that you are using CDC at a place where you don't want to use it, so maybe it helps to first explain your use case. Is stream processing a good fit for you in the first place? On Tue, Feb 9, 2021 at 10:37 AM meneldor <[hidden email]> wrote:
|
Are you sure that the null records are not actually tombstone records? If you use upsert tables you usually want to have them + compaction. Or how else will you deal with deletions? yes they are tombstone records, but i cannot avoid them because the deduplication query cant produce an append-only connector on a LastRow. What do you want to achieve? CDC records should be deduplicated by definition. Yes, I want to aggregate the state to the current state. The problem is that the records are gonna be merged in a database by an ETL every hour. So i don't need all the updates but only the last one, thats why im using a window function and the future updates will be evaluated by the MERGE query in the ETL too. I've changed the query to instead use max(upd_ts) which is producing to append only stream and it works but im not 100% sure if the result is the same: INSERT INTO sink_table Thanks! On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <[hidden email]> wrote:
|
The query which I'm testing now(trying to avoid the deduplication query because of tombstones) is almost correct but there are two questions which I can find an answer to: 1. Some of the id's are just stopping to be produced. 2. Does the Tuble window select only the records whose upd_ts is new or the query will always produce all the records in the dynamic table stats_topic with the max(upd_ts)? CREATE TABLE stats_topic(
INSERT INTO sink_table Thank you! On Thu, Feb 11, 2021 at 1:36 PM meneldor <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |