Kafka connector doesn't support consuming update and delete changes in Table SQL API

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka connector doesn't support consuming update and delete changes in Table SQL API

meneldor
Hello,
Flink 1.12.1(pyflink)
I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here is the SQL:

CREATE TABLE stats_topic(
          `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
          `ts` BIGINT,
          `xid` BIGINT ,
          row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
          WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          'topic' = 'stats_topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.group.id' = 'test_group'
        )

CREATE TABLE sink_table(
          `id` BIGINT,
          `account` INT,
          `upd_ts` BIGINT
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          'topic' = 'sink_topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.group.id' = 'test_group'
        )


INSERT INTO sink_table
SELECT
id,
account,
upd_ts
FROM (
SELECT
 id,
 account,
 upd_ts,
 ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
FROM stats_topic
GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
)
WHERE rownum=1

 As there are a lot of CDC records for a single ID im using ROW_NUMBER() and produce them on a 20 minutes interval to the sink_topic. The problem is that flink doesnt allow me to use it in combination with with the kafka connector:
pyflink.util.exceptions.TableException: Table sink 'default_catalog.default_database.sink_table' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])

If I use the upsert-kafka connector everything is fine but then i receive empty JSON records in the sink topic:
{"id": 111111, "account": 4, "upd_ts": 1612334952}
{"id": 222222, "account": 4, "upd_ts": 1612334953}
{}
{"id": 333333, "account": 4, "upd_ts": 1612334955}
{}
{"id": 444444, "account": 4, "upd_ts": 1612334956}

Thank you! 
Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

meneldor
Any help please? Is there a way to use the "Last row" from a deduplication in an append-only stream or tell upsert-kafka to not produce null records in the sink?

Thank you

On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]> wrote:
Hello,
Flink 1.12.1(pyflink)
I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here is the SQL:

CREATE TABLE stats_topic(
          `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
          `ts` BIGINT,
          `xid` BIGINT ,
          row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
          WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          'topic' = 'stats_topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.group.id' = 'test_group'
        )

CREATE TABLE sink_table(
          `id` BIGINT,
          `account` INT,
          `upd_ts` BIGINT
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          'topic' = 'sink_topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.group.id' = 'test_group'
        )


INSERT INTO sink_table
SELECT
id,
account,
upd_ts
FROM (
SELECT
 id,
 account,
 upd_ts,
 ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
FROM stats_topic
GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
)
WHERE rownum=1

 As there are a lot of CDC records for a single ID im using ROW_NUMBER() and produce them on a 20 minutes interval to the sink_topic. The problem is that flink doesnt allow me to use it in combination with with the kafka connector:
pyflink.util.exceptions.TableException: Table sink 'default_catalog.default_database.sink_table' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])

If I use the upsert-kafka connector everything is fine but then i receive empty JSON records in the sink topic:
{"id": 111111, "account": 4, "upd_ts": 1612334952}
{"id": 222222, "account": 4, "upd_ts": 1612334953}
{}
{"id": 333333, "account": 4, "upd_ts": 1612334955}
{}
{"id": 444444, "account": 4, "upd_ts": 1612334956}

Thank you! 
Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

r_khachatryan
Hi,

AFAIK this should be supported in 1.12 via FLINK-19568 [1] 
I'm pulling in Timo and Jark who might know better.


Regards,
Roman


On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]> wrote:
Any help please? Is there a way to use the "Last row" from a deduplication in an append-only stream or tell upsert-kafka to not produce null records in the sink?

Thank you

On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]> wrote:
Hello,
Flink 1.12.1(pyflink)
I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here is the SQL:

CREATE TABLE stats_topic(
          `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
          `ts` BIGINT,
          `xid` BIGINT ,
          row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
          WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          'topic' = 'stats_topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.group.id' = 'test_group'
        )

CREATE TABLE sink_table(
          `id` BIGINT,
          `account` INT,
          `upd_ts` BIGINT
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          'topic' = 'sink_topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.group.id' = 'test_group'
        )


INSERT INTO sink_table
SELECT
id,
account,
upd_ts
FROM (
SELECT
 id,
 account,
 upd_ts,
 ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
FROM stats_topic
GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
)
WHERE rownum=1

 As there are a lot of CDC records for a single ID im using ROW_NUMBER() and produce them on a 20 minutes interval to the sink_topic. The problem is that flink doesnt allow me to use it in combination with with the kafka connector:
pyflink.util.exceptions.TableException: Table sink 'default_catalog.default_database.sink_table' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])

If I use the upsert-kafka connector everything is fine but then i receive empty JSON records in the sink topic:
{"id": 111111, "account": 4, "upd_ts": 1612334952}
{"id": 222222, "account": 4, "upd_ts": 1612334953}
{}
{"id": 333333, "account": 4, "upd_ts": 1612334955}
{}
{"id": 444444, "account": 4, "upd_ts": 1612334956}

Thank you! 
Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Timo Walther
Hi,

could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with
us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode
in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:

> Hi,
>
> AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> I'm pulling in Timo and Jark who might know better.
>
> https://issues.apache.org/jira/browse/FLINK-19857 
> <https://issues.apache.org/jira/browse/FLINK-19857>
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Any help please? Is there a way to use the "Last row" from a
>     deduplication in an append-only stream or tell upsert-kafka to not
>     produce *null* records in the sink?
>
>     Thank you
>
>     On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hello,
>         Flink 1.12.1(pyflink)
>         I am deduplicating CDC records coming from Maxwell in a kafka
>         topic.  Here is the SQL:
>
>             CREATE TABLE stats_topic(
>                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>             BIGINT>,
>                        `ts` BIGINT,
>                        `xid` BIGINT ,
>                        row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>             '15' SECOND
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'stats_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>             CREATE TABLE sink_table(
>                        `id` BIGINT,
>                        `account` INT,
>                        `upd_ts` BIGINT
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'sink_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>
>             INSERT INTO sink_table
>             SELECT
>             id,
>             account,
>             upd_ts
>             FROM (
>             SELECT
>               id,
>               account,
>               upd_ts,
>               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>             AS rownum
>             FROM stats_topic
>             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>             MINUTE)
>             )
>             WHERE rownum=1
>
>
>           As there are a lot of CDC records for a single ID im using
>         ROW_NUMBER() and produce them on a 20 minutes interval to the
>         sink_topic. The problem is that flink doesnt allow me to use it
>         in combination with with the kafka connector:
>
>             pyflink.util.exceptions.TableException: Table sink
>             'default_catalog.default_database.sink_table' doesn't
>             support consuming update and delete changes which is
>             produced by node Rank(strategy=[UndefinedStrategy],
>             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
>
>
>         If I use the*upsert-kafka* connector everything is fine but then
>         i receive empty JSON records in the sink topic:
>
>             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>             {}
>             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>             {}
>             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>
>
>         Thank you!
>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

meneldor
Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther <[hidden email]> wrote:
Hi,

could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with
us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode
in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:
> Hi,
>
> AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> I'm pulling in Timo and Jark who might know better.
>
> https://issues.apache.org/jira/browse/FLINK-19857
> <https://issues.apache.org/jira/browse/FLINK-19857>
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Any help please? Is there a way to use the "Last row" from a
>     deduplication in an append-only stream or tell upsert-kafka to not
>     produce *null* records in the sink?
>
>     Thank you
>
>     On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hello,
>         Flink 1.12.1(pyflink)
>         I am deduplicating CDC records coming from Maxwell in a kafka
>         topic.  Here is the SQL:
>
>             CREATE TABLE stats_topic(
>                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>             BIGINT>,
>                        `ts` BIGINT,
>                        `xid` BIGINT ,
>                        row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>             '15' SECOND
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'stats_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>             CREATE TABLE sink_table(
>                        `id` BIGINT,
>                        `account` INT,
>                        `upd_ts` BIGINT
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'sink_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>
>             INSERT INTO sink_table
>             SELECT
>             id,
>             account,
>             upd_ts
>             FROM (
>             SELECT
>               id,
>               account,
>               upd_ts,
>               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>             AS rownum
>             FROM stats_topic
>             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>             MINUTE)
>             )
>             WHERE rownum=1
>
>
>           As there are a lot of CDC records for a single ID im using
>         ROW_NUMBER() and produce them on a 20 minutes interval to the
>         sink_topic. The problem is that flink doesnt allow me to use it
>         in combination with with the kafka connector:
>
>             pyflink.util.exceptions.TableException: Table sink
>             'default_catalog.default_database.sink_table' doesn't
>             support consuming update and delete changes which is
>             produced by node Rank(strategy=[UndefinedStrategy],
>             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
>
>
>         If I use the*upsert-kafka* connector everything is fine but then
>         i receive empty JSON records in the sink topic:
>
>             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>             {}
>             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>             {}
>             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>
>
>         Thank you!
>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

meneldor
Unfortunately using row_ts doesn't help. Setting the kafka topic cleanup.policy to compact is not a very good idea as it increases cpu, memory and might lead to other problems.
So for now I'll just ignore the null records. Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ?

Thanks!

On Mon, Feb 8, 2021 at 7:13 PM meneldor <[hidden email]> wrote:
Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther <[hidden email]> wrote:
Hi,

could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with
us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode
in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:
> Hi,
>
> AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> I'm pulling in Timo and Jark who might know better.
>
> https://issues.apache.org/jira/browse/FLINK-19857
> <https://issues.apache.org/jira/browse/FLINK-19857>
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Any help please? Is there a way to use the "Last row" from a
>     deduplication in an append-only stream or tell upsert-kafka to not
>     produce *null* records in the sink?
>
>     Thank you
>
>     On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hello,
>         Flink 1.12.1(pyflink)
>         I am deduplicating CDC records coming from Maxwell in a kafka
>         topic.  Here is the SQL:
>
>             CREATE TABLE stats_topic(
>                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>             BIGINT>,
>                        `ts` BIGINT,
>                        `xid` BIGINT ,
>                        row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>             '15' SECOND
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'stats_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>             CREATE TABLE sink_table(
>                        `id` BIGINT,
>                        `account` INT,
>                        `upd_ts` BIGINT
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'sink_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>
>             INSERT INTO sink_table
>             SELECT
>             id,
>             account,
>             upd_ts
>             FROM (
>             SELECT
>               id,
>               account,
>               upd_ts,
>               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>             AS rownum
>             FROM stats_topic
>             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>             MINUTE)
>             )
>             WHERE rownum=1
>
>
>           As there are a lot of CDC records for a single ID im using
>         ROW_NUMBER() and produce them on a 20 minutes interval to the
>         sink_topic. The problem is that flink doesnt allow me to use it
>         in combination with with the kafka connector:
>
>             pyflink.util.exceptions.TableException: Table sink
>             'default_catalog.default_database.sink_table' doesn't
>             support consuming update and delete changes which is
>             produced by node Rank(strategy=[UndefinedStrategy],
>             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
>
>
>         If I use the*upsert-kafka* connector everything is fine but then
>         i receive empty JSON records in the sink topic:
>
>             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>             {}
>             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>             {}
>             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>
>
>         Thank you!
>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Arvid Heise-4
Hi,

Are you sure that the null records are not actually tombstone records? If you use upsert tables you usually want to have them + compaction. Or how else will you deal with deletions?

Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ?
What do you want to achieve? CDC records should be deduplicated by definition.
I'm assuming that you want to aggregate the state to the current state. If so, how do you decide when the record is complete (e.g. no future updates) and can be written?

I have the feeling that you are using CDC at a place where you don't want to use it, so maybe it helps to first explain your use case. Is stream processing a good fit for you in the first place?

On Tue, Feb 9, 2021 at 10:37 AM meneldor <[hidden email]> wrote:
Unfortunately using row_ts doesn't help. Setting the kafka topic cleanup.policy to compact is not a very good idea as it increases cpu, memory and might lead to other problems.
So for now I'll just ignore the null records. Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ?

Thanks!

On Mon, Feb 8, 2021 at 7:13 PM meneldor <[hidden email]> wrote:
Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther <[hidden email]> wrote:
Hi,

could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with
us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode
in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:
> Hi,
>
> AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> I'm pulling in Timo and Jark who might know better.
>
> https://issues.apache.org/jira/browse/FLINK-19857
> <https://issues.apache.org/jira/browse/FLINK-19857>
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Any help please? Is there a way to use the "Last row" from a
>     deduplication in an append-only stream or tell upsert-kafka to not
>     produce *null* records in the sink?
>
>     Thank you
>
>     On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hello,
>         Flink 1.12.1(pyflink)
>         I am deduplicating CDC records coming from Maxwell in a kafka
>         topic.  Here is the SQL:
>
>             CREATE TABLE stats_topic(
>                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>             BIGINT>,
>                        `ts` BIGINT,
>                        `xid` BIGINT ,
>                        row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>             '15' SECOND
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'stats_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>             CREATE TABLE sink_table(
>                        `id` BIGINT,
>                        `account` INT,
>                        `upd_ts` BIGINT
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'sink_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>
>             INSERT INTO sink_table
>             SELECT
>             id,
>             account,
>             upd_ts
>             FROM (
>             SELECT
>               id,
>               account,
>               upd_ts,
>               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>             AS rownum
>             FROM stats_topic
>             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>             MINUTE)
>             )
>             WHERE rownum=1
>
>
>           As there are a lot of CDC records for a single ID im using
>         ROW_NUMBER() and produce them on a 20 minutes interval to the
>         sink_topic. The problem is that flink doesnt allow me to use it
>         in combination with with the kafka connector:
>
>             pyflink.util.exceptions.TableException: Table sink
>             'default_catalog.default_database.sink_table' doesn't
>             support consuming update and delete changes which is
>             produced by node Rank(strategy=[UndefinedStrategy],
>             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
>
>
>         If I use the*upsert-kafka* connector everything is fine but then
>         i receive empty JSON records in the sink topic:
>
>             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>             {}
>             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>             {}
>             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>
>
>         Thank you!
>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

meneldor
Are you sure that the null records are not actually tombstone records? If you use upsert tables you usually want to have them + compaction. Or how else will you deal with deletions?
yes they are tombstone records, but i cannot avoid them because the deduplication query cant produce an append-only connector on a LastRow.

What do you want to achieve? CDC records should be deduplicated by definition.
I'm assuming that you want to aggregate the state to the current state. If so, how do you decide when the record is complete (e.g. no future updates) and can be written?
I have the feeling that you are using CDC at a place where you don't want to use it, so maybe it helps to first explain your use case. Is stream processing a good fit for you in the first place? 
Yes, I want to aggregate the state to the current state. The problem is that the records are gonna be merged in a database by an ETL every hour. So i don't need all the updates but only the last one, thats why im using a window function and the future updates will be evaluated by the MERGE query in the ETL too.

I've changed the query to instead use max(upd_ts) which is producing to append only stream and it works but im not 100% sure if the result is the same:
INSERT INTO sink_table
SELECT distinct id, account, upd_ts
FROM stats_topic t, (
   SELECT id, account, max(upd_ts) as maxTs,
   FROM stats_topic
   GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE)
) s
WHERE  t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account
 
Thanks!

On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <[hidden email]> wrote:
Hi,

Are you sure that the null records are not actually tombstone records? If you use upsert tables you usually want to have them + compaction. Or how else will you deal with deletions?

Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ?
What do you want to achieve? CDC records should be deduplicated by definition.
I'm assuming that you want to aggregate the state to the current state. If so, how do you decide when the record is complete (e.g. no future updates) and can be written?

I have the feeling that you are using CDC at a place where you don't want to use it, so maybe it helps to first explain your use case. Is stream processing a good fit for you in the first place?

On Tue, Feb 9, 2021 at 10:37 AM meneldor <[hidden email]> wrote:
Unfortunately using row_ts doesn't help. Setting the kafka topic cleanup.policy to compact is not a very good idea as it increases cpu, memory and might lead to other problems.
So for now I'll just ignore the null records. Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ?

Thanks!

On Mon, Feb 8, 2021 at 7:13 PM meneldor <[hidden email]> wrote:
Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther <[hidden email]> wrote:
Hi,

could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with
us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode
in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:
> Hi,
>
> AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> I'm pulling in Timo and Jark who might know better.
>
> https://issues.apache.org/jira/browse/FLINK-19857
> <https://issues.apache.org/jira/browse/FLINK-19857>
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Any help please? Is there a way to use the "Last row" from a
>     deduplication in an append-only stream or tell upsert-kafka to not
>     produce *null* records in the sink?
>
>     Thank you
>
>     On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hello,
>         Flink 1.12.1(pyflink)
>         I am deduplicating CDC records coming from Maxwell in a kafka
>         topic.  Here is the SQL:
>
>             CREATE TABLE stats_topic(
>                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>             BIGINT>,
>                        `ts` BIGINT,
>                        `xid` BIGINT ,
>                        row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>             '15' SECOND
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'stats_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>             CREATE TABLE sink_table(
>                        `id` BIGINT,
>                        `account` INT,
>                        `upd_ts` BIGINT
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'sink_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>
>             INSERT INTO sink_table
>             SELECT
>             id,
>             account,
>             upd_ts
>             FROM (
>             SELECT
>               id,
>               account,
>               upd_ts,
>               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>             AS rownum
>             FROM stats_topic
>             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>             MINUTE)
>             )
>             WHERE rownum=1
>
>
>           As there are a lot of CDC records for a single ID im using
>         ROW_NUMBER() and produce them on a 20 minutes interval to the
>         sink_topic. The problem is that flink doesnt allow me to use it
>         in combination with with the kafka connector:
>
>             pyflink.util.exceptions.TableException: Table sink
>             'default_catalog.default_database.sink_table' doesn't
>             support consuming update and delete changes which is
>             produced by node Rank(strategy=[UndefinedStrategy],
>             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
>
>
>         If I use the*upsert-kafka* connector everything is fine but then
>         i receive empty JSON records in the sink topic:
>
>             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>             {}
>             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>             {}
>             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>
>
>         Thank you!
>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

meneldor
The query which I'm testing now(trying to avoid the deduplication query because of tombstones) is almost correct but there are two questions which I can find an answer to:
1. Some of the id's are just stopping to be produced.
2. Does the Tuble window select only the records whose upd_ts is new or the query will always produce all the records in the dynamic table stats_topic with the max(upd_ts)?
CREATE TABLE stats_topic(
`data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
`ts` BIGINT,
`xid` BIGINT ,
row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
'format' = 'json',
'topic' = 'stats_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test_group'
)

INSERT INTO sink_table
SELECT distinct id, account, upd_ts
FROM stats_topic t, (
SELECT id, max(upd_ts) as maxTs,
FROM stats_topic
GROUP BY id, TUMBLE(row_ts, INTERVAL '20' MINUTE)
) s
WHERE t.id = s.id AND t.upd_ts = s.maxTs

Thank you!

On Thu, Feb 11, 2021 at 1:36 PM meneldor <[hidden email]> wrote:
Are you sure that the null records are not actually tombstone records? If you use upsert tables you usually want to have them + compaction. Or how else will you deal with deletions?
yes they are tombstone records, but i cannot avoid them because the deduplication query cant produce an append-only connector on a LastRow.

What do you want to achieve? CDC records should be deduplicated by definition.
I'm assuming that you want to aggregate the state to the current state. If so, how do you decide when the record is complete (e.g. no future updates) and can be written?
I have the feeling that you are using CDC at a place where you don't want to use it, so maybe it helps to first explain your use case. Is stream processing a good fit for you in the first place? 
Yes, I want to aggregate the state to the current state. The problem is that the records are gonna be merged in a database by an ETL every hour. So i don't need all the updates but only the last one, thats why im using a window function and the future updates will be evaluated by the MERGE query in the ETL too.

I've changed the query to instead use max(upd_ts) which is producing to append only stream and it works but im not 100% sure if the result is the same:
INSERT INTO sink_table
SELECT distinct id, account, upd_ts
FROM stats_topic t, (
   SELECT id, account, max(upd_ts) as maxTs,
   FROM stats_topic
   GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE)
) s
WHERE  t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account
 
Thanks!

On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <[hidden email]> wrote:
Hi,

Are you sure that the null records are not actually tombstone records? If you use upsert tables you usually want to have them + compaction. Or how else will you deal with deletions?

Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ?
What do you want to achieve? CDC records should be deduplicated by definition.
I'm assuming that you want to aggregate the state to the current state. If so, how do you decide when the record is complete (e.g. no future updates) and can be written?

I have the feeling that you are using CDC at a place where you don't want to use it, so maybe it helps to first explain your use case. Is stream processing a good fit for you in the first place?

On Tue, Feb 9, 2021 at 10:37 AM meneldor <[hidden email]> wrote:
Unfortunately using row_ts doesn't help. Setting the kafka topic cleanup.policy to compact is not a very good idea as it increases cpu, memory and might lead to other problems.
So for now I'll just ignore the null records. Is there anyone who is successfully deduplicating CDC records into either kafka topic or S3 files(CSV/parquet) ?

Thanks!

On Mon, Feb 8, 2021 at 7:13 PM meneldor <[hidden email]> wrote:
Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther <[hidden email]> wrote:
Hi,

could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with
us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode
in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:
> Hi,
>
> AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> I'm pulling in Timo and Jark who might know better.
>
> https://issues.apache.org/jira/browse/FLINK-19857
> <https://issues.apache.org/jira/browse/FLINK-19857>
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 9:14 AM meneldor <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Any help please? Is there a way to use the "Last row" from a
>     deduplication in an append-only stream or tell upsert-kafka to not
>     produce *null* records in the sink?
>
>     Thank you
>
>     On Thu, Feb 4, 2021 at 1:22 PM meneldor <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hello,
>         Flink 1.12.1(pyflink)
>         I am deduplicating CDC records coming from Maxwell in a kafka
>         topic.  Here is the SQL:
>
>             CREATE TABLE stats_topic(
>                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>             BIGINT>,
>                        `ts` BIGINT,
>                        `xid` BIGINT ,
>                        row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>             '15' SECOND
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'stats_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>             CREATE TABLE sink_table(
>                        `id` BIGINT,
>                        `account` INT,
>                        `upd_ts` BIGINT
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'sink_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
>
>
>             INSERT INTO sink_table
>             SELECT
>             id,
>             account,
>             upd_ts
>             FROM (
>             SELECT
>               id,
>               account,
>               upd_ts,
>               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>             AS rownum
>             FROM stats_topic
>             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>             MINUTE)
>             )
>             WHERE rownum=1
>
>
>           As there are a lot of CDC records for a single ID im using
>         ROW_NUMBER() and produce them on a 20 minutes interval to the
>         sink_topic. The problem is that flink doesnt allow me to use it
>         in combination with with the kafka connector:
>
>             pyflink.util.exceptions.TableException: Table sink
>             'default_catalog.default_database.sink_table' doesn't
>             support consuming update and delete changes which is
>             produced by node Rank(strategy=[UndefinedStrategy],
>             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
>
>
>         If I use the*upsert-kafka* connector everything is fine but then
>         i receive empty JSON records in the sink topic:
>
>             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>             {}
>             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>             {}
>             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>
>
>         Thank you!
>