Does the Kafka source perform retractions on Key?

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Does the Kafka source perform retractions on Key?

Rex Fenley
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Rex Fenley
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Jan Lukavský

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Arvid Heise-4
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Rex Fenley
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Arvid Heise-4
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Rex Fenley
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Rex Fenley
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Arvid Heise-4
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on how well your application can deal with incomplete logs. Let's say your application is simply adding and subtracting retractions, you'd get -1. If your application is ignoring deletions without insertions (needs to be tracked for each person), you'd get 0. If your application is not looking at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application correctly. The correct result will only be achieved through the most complex application (aggregating state for each person and dealing with incomplete information). This is completely independent of Kafka, Debezium, or Flink.

---

However, as Jan pointed out: If you don't process data before compaction, then your application is correct. Now, then the question is what's the benefit of having data in the topic older than the compaction? The value is close to 0 as you can't really use it for CDC processing (again independent of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy and offload the data to s3 for historic (re)processing (afaik the cloud offering of confluent finally has automatic offloading but you can also build it yourself). Then you only need to ensure that your application is never accessing data that is deleted because of the retention time. In general, it's better to choose a technology such as Pulsar with tiered storage that gives you exactly what you want with low overhead: you need unlimited retention without compaction but without holding much data in expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me why you'd need compaction + a different retention for source/intermediate topics.

For the final topic, from my experience, a real key/value store works much better than log compacted topics for serving web applications. Confluent's marketing is strongly pushing that Kafka can be used as a database and as a key/value store while in reality, it's "just" a good distribution log. I can provide pointers that discuss the limitations if there is interest. Also note that the final topic should not be in CDC format anymore (so no retractions). It should just contain the current state. For both examples together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <[hidden email]> wrote:
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Rex Fenley
Hi Arvid,

I really appreciate the thorough response but I don't think this contradicts our use case. In servicing web applications we're doing nothing more than taking data from giant databases we use, and performing joins and denormalizing aggs strictly for performance reasons (joining across a lot of stuff on query time is slow) and putting specified results into another database connected to the specified web server. Our Flink jobs are purely used for up-to-date materialized views. We don't care about historical analysis, we only care about what the exact current state of the world is.

This is why every row has a primary key, from beginning to end of the job (even though Flink's table api can't seem to detect that after a lot of joins in our plan, but it's logically true since then the join key will be pk). This is also why all we need to do is retract the current row from the Kafka source on the existing primary key that's being overwritten, have that retract propagate downstream to throw away any data transformed from that row, and then process the new row. We don't care what other data changes may have happened in between, it's not applicable to our use case.

We're using CDC for nothing more than a way to get the latest rows in real time into Kafka so they can be read by various Flink jobs we hope to build (starting with the one we're currently working on that has ~35 stateful operators) which then just transform and forward to another database.

----

Reading the Upsert Kafka docs [1] "In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values." This is how we thought the regular Kafka source actually worked, that it had state on PKs it could retract on, because we weren't even thinking of any other use case until it hit me that may not be true. Luckily the doc also provides an example of simply forwarding from DBZ Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't matter since now everything in the actual job reading from Upsert Kafka should function by PK like we need. On that note, I think it may be helpful to edit the documentation to indicate that if you need stateful PK based Kafka consumption it must be via Upsert Kafka.


Again, thanks for the thorough reply, this really helped my understanding!

On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on how well your application can deal with incomplete logs. Let's say your application is simply adding and subtracting retractions, you'd get -1. If your application is ignoring deletions without insertions (needs to be tracked for each person), you'd get 0. If your application is not looking at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application correctly. The correct result will only be achieved through the most complex application (aggregating state for each person and dealing with incomplete information). This is completely independent of Kafka, Debezium, or Flink.

---

However, as Jan pointed out: If you don't process data before compaction, then your application is correct. Now, then the question is what's the benefit of having data in the topic older than the compaction? The value is close to 0 as you can't really use it for CDC processing (again independent of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy and offload the data to s3 for historic (re)processing (afaik the cloud offering of confluent finally has automatic offloading but you can also build it yourself). Then you only need to ensure that your application is never accessing data that is deleted because of the retention time. In general, it's better to choose a technology such as Pulsar with tiered storage that gives you exactly what you want with low overhead: you need unlimited retention without compaction but without holding much data in expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me why you'd need compaction + a different retention for source/intermediate topics.

For the final topic, from my experience, a real key/value store works much better than log compacted topics for serving web applications. Confluent's marketing is strongly pushing that Kafka can be used as a database and as a key/value store while in reality, it's "just" a good distribution log. I can provide pointers that discuss the limitations if there is interest. Also note that the final topic should not be in CDC format anymore (so no retractions). It should just contain the current state. For both examples together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <[hidden email]> wrote:
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Arvid Heise-4
Hi Rex,

Your initial question was about the impact of compaction on your CDC application logic. I have been (unsuccessfully) trying to tell you that you do not need compaction and it's counterproductive.

If you are not rereading the topics, why do you compact them? It's lost compute time and I/O on the Kafka brokers (which are both very valuable) and does not give you anything that an appropriate retention time wouldn't give you (=lower SSD usage). It makes the mental model more complicated. An aggressive compaction and a larger backlog (compaction time < application failure/restart/upgrade time) would lead to incorrect results (in the same way an inappropriate retention period may cause data loss for the same reason).

The only use case for log compaction is if you're using a Kafka topic for a key/value store to serve a web application (in which case, it's usually better to take a real key/value store) but then you don't need retractions anymore but you'd simply overwrite the actual values or use tombstone records for deletions.

If you consume the same topic both for web applications and Flink and don't want to use another technology for key/value store, then log compaction of retractions kinda makes sense to kill 2 birds with one stone. However, you have to live with the downsides on the Flink side (correctness depends on compaction < downtime) and on web application (deal with retractions even though they do not make any sense at that level). Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs and solve all issues on the Flink side (final note: it's independent of the technology, any stream processor will encounter the same issue as it's a conceptual mismatch).

On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

I really appreciate the thorough response but I don't think this contradicts our use case. In servicing web applications we're doing nothing more than taking data from giant databases we use, and performing joins and denormalizing aggs strictly for performance reasons (joining across a lot of stuff on query time is slow) and putting specified results into another database connected to the specified web server. Our Flink jobs are purely used for up-to-date materialized views. We don't care about historical analysis, we only care about what the exact current state of the world is.

This is why every row has a primary key, from beginning to end of the job (even though Flink's table api can't seem to detect that after a lot of joins in our plan, but it's logically true since then the join key will be pk). This is also why all we need to do is retract the current row from the Kafka source on the existing primary key that's being overwritten, have that retract propagate downstream to throw away any data transformed from that row, and then process the new row. We don't care what other data changes may have happened in between, it's not applicable to our use case.

We're using CDC for nothing more than a way to get the latest rows in real time into Kafka so they can be read by various Flink jobs we hope to build (starting with the one we're currently working on that has ~35 stateful operators) which then just transform and forward to another database.

----

Reading the Upsert Kafka docs [1] "In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values." This is how we thought the regular Kafka source actually worked, that it had state on PKs it could retract on, because we weren't even thinking of any other use case until it hit me that may not be true. Luckily the doc also provides an example of simply forwarding from DBZ Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't matter since now everything in the actual job reading from Upsert Kafka should function by PK like we need. On that note, I think it may be helpful to edit the documentation to indicate that if you need stateful PK based Kafka consumption it must be via Upsert Kafka.


Again, thanks for the thorough reply, this really helped my understanding!

On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on how well your application can deal with incomplete logs. Let's say your application is simply adding and subtracting retractions, you'd get -1. If your application is ignoring deletions without insertions (needs to be tracked for each person), you'd get 0. If your application is not looking at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application correctly. The correct result will only be achieved through the most complex application (aggregating state for each person and dealing with incomplete information). This is completely independent of Kafka, Debezium, or Flink.

---

However, as Jan pointed out: If you don't process data before compaction, then your application is correct. Now, then the question is what's the benefit of having data in the topic older than the compaction? The value is close to 0 as you can't really use it for CDC processing (again independent of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy and offload the data to s3 for historic (re)processing (afaik the cloud offering of confluent finally has automatic offloading but you can also build it yourself). Then you only need to ensure that your application is never accessing data that is deleted because of the retention time. In general, it's better to choose a technology such as Pulsar with tiered storage that gives you exactly what you want with low overhead: you need unlimited retention without compaction but without holding much data in expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me why you'd need compaction + a different retention for source/intermediate topics.

For the final topic, from my experience, a real key/value store works much better than log compacted topics for serving web applications. Confluent's marketing is strongly pushing that Kafka can be used as a database and as a key/value store while in reality, it's "just" a good distribution log. I can provide pointers that discuss the limitations if there is interest. Also note that the final topic should not be in CDC format anymore (so no retractions). It should just contain the current state. For both examples together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <[hidden email]> wrote:
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Rex Fenley
Hi Arvid,

>If you are not rereading the topics, why do you compact them?
We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.

>correctness depends on compaction < downtime
I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data.

>Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs
Is there a cloud-native key/value store which can read from a Postgres WAL or MySQL binlog and then keep an up-to-date read marker for any materialization consumers downstream besides Kafka + Debezium?

Appreciate all the feedback, though hopefully we can get closer to the same mental model. If there's really a better alternative here I'm all for it!


On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

Your initial question was about the impact of compaction on your CDC application logic. I have been (unsuccessfully) trying to tell you that you do not need compaction and it's counterproductive.

If you are not rereading the topics, why do you compact them? It's lost compute time and I/O on the Kafka brokers (which are both very valuable) and does not give you anything that an appropriate retention time wouldn't give you (=lower SSD usage). It makes the mental model more complicated. An aggressive compaction and a larger backlog (compaction time < application failure/restart/upgrade time) would lead to incorrect results (in the same way an inappropriate retention period may cause data loss for the same reason).

The only use case for log compaction is if you're using a Kafka topic for a key/value store to serve a web application (in which case, it's usually better to take a real key/value store) but then you don't need retractions anymore but you'd simply overwrite the actual values or use tombstone records for deletions.

If you consume the same topic both for web applications and Flink and don't want to use another technology for key/value store, then log compaction of retractions kinda makes sense to kill 2 birds with one stone. However, you have to live with the downsides on the Flink side (correctness depends on compaction < downtime) and on web application (deal with retractions even though they do not make any sense at that level). Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs and solve all issues on the Flink side (final note: it's independent of the technology, any stream processor will encounter the same issue as it's a conceptual mismatch).

On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

I really appreciate the thorough response but I don't think this contradicts our use case. In servicing web applications we're doing nothing more than taking data from giant databases we use, and performing joins and denormalizing aggs strictly for performance reasons (joining across a lot of stuff on query time is slow) and putting specified results into another database connected to the specified web server. Our Flink jobs are purely used for up-to-date materialized views. We don't care about historical analysis, we only care about what the exact current state of the world is.

This is why every row has a primary key, from beginning to end of the job (even though Flink's table api can't seem to detect that after a lot of joins in our plan, but it's logically true since then the join key will be pk). This is also why all we need to do is retract the current row from the Kafka source on the existing primary key that's being overwritten, have that retract propagate downstream to throw away any data transformed from that row, and then process the new row. We don't care what other data changes may have happened in between, it's not applicable to our use case.

We're using CDC for nothing more than a way to get the latest rows in real time into Kafka so they can be read by various Flink jobs we hope to build (starting with the one we're currently working on that has ~35 stateful operators) which then just transform and forward to another database.

----

Reading the Upsert Kafka docs [1] "In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values." This is how we thought the regular Kafka source actually worked, that it had state on PKs it could retract on, because we weren't even thinking of any other use case until it hit me that may not be true. Luckily the doc also provides an example of simply forwarding from DBZ Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't matter since now everything in the actual job reading from Upsert Kafka should function by PK like we need. On that note, I think it may be helpful to edit the documentation to indicate that if you need stateful PK based Kafka consumption it must be via Upsert Kafka.


Again, thanks for the thorough reply, this really helped my understanding!

On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on how well your application can deal with incomplete logs. Let's say your application is simply adding and subtracting retractions, you'd get -1. If your application is ignoring deletions without insertions (needs to be tracked for each person), you'd get 0. If your application is not looking at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application correctly. The correct result will only be achieved through the most complex application (aggregating state for each person and dealing with incomplete information). This is completely independent of Kafka, Debezium, or Flink.

---

However, as Jan pointed out: If you don't process data before compaction, then your application is correct. Now, then the question is what's the benefit of having data in the topic older than the compaction? The value is close to 0 as you can't really use it for CDC processing (again independent of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy and offload the data to s3 for historic (re)processing (afaik the cloud offering of confluent finally has automatic offloading but you can also build it yourself). Then you only need to ensure that your application is never accessing data that is deleted because of the retention time. In general, it's better to choose a technology such as Pulsar with tiered storage that gives you exactly what you want with low overhead: you need unlimited retention without compaction but without holding much data in expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me why you'd need compaction + a different retention for source/intermediate topics.

For the final topic, from my experience, a real key/value store works much better than log compacted topics for serving web applications. Confluent's marketing is strongly pushing that Kafka can be used as a database and as a key/value store while in reality, it's "just" a good distribution log. I can provide pointers that discuss the limitations if there is interest. Also note that the final topic should not be in CDC format anymore (so no retractions). It should just contain the current state. For both examples together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <[hidden email]> wrote:
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Arvid Heise-4
> We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.
> I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data.
Why are you reading from a CDC topic and not a log-compacted topic that reflects the state then? CDC is all about history and changes.

What i'd imagine an architecture that would work better for you:

For each SQL table (ingress layer):
SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic (compacted)

Analytics (processing layer):
Kafka state topics (compacted) -> Analytical Flink job -> Kafka state topic (compacted)

For each view (egress layer):
Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s) -> Web application

The ingress layer is only there to provide you log-compacted Kafka topics. Then you can do a bunch of analytical queries from Kafka to Kafka. Finally, you output your views to K/V stores for high-avail web applications (=decoupled from processing layer).

If that's what you already have, then my apology for not picking that up. It's really important to stress that no Kafka topics ever contain CDC data in this instance since you are not interested in historic data. The only CDC exchange is by using the debezium connector of Flink. At this point, all discussions of this thread are resolved.



On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

>If you are not rereading the topics, why do you compact them?
We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.

>correctness depends on compaction < downtime
I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data.

>Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs
Is there a cloud-native key/value store which can read from a Postgres WAL or MySQL binlog and then keep an up-to-date read marker for any materialization consumers downstream besides Kafka + Debezium?

Appreciate all the feedback, though hopefully we can get closer to the same mental model. If there's really a better alternative here I'm all for it!


On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

Your initial question was about the impact of compaction on your CDC application logic. I have been (unsuccessfully) trying to tell you that you do not need compaction and it's counterproductive.

If you are not rereading the topics, why do you compact them? It's lost compute time and I/O on the Kafka brokers (which are both very valuable) and does not give you anything that an appropriate retention time wouldn't give you (=lower SSD usage). It makes the mental model more complicated. An aggressive compaction and a larger backlog (compaction time < application failure/restart/upgrade time) would lead to incorrect results (in the same way an inappropriate retention period may cause data loss for the same reason).

The only use case for log compaction is if you're using a Kafka topic for a key/value store to serve a web application (in which case, it's usually better to take a real key/value store) but then you don't need retractions anymore but you'd simply overwrite the actual values or use tombstone records for deletions.

If you consume the same topic both for web applications and Flink and don't want to use another technology for key/value store, then log compaction of retractions kinda makes sense to kill 2 birds with one stone. However, you have to live with the downsides on the Flink side (correctness depends on compaction < downtime) and on web application (deal with retractions even though they do not make any sense at that level). Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs and solve all issues on the Flink side (final note: it's independent of the technology, any stream processor will encounter the same issue as it's a conceptual mismatch).

On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

I really appreciate the thorough response but I don't think this contradicts our use case. In servicing web applications we're doing nothing more than taking data from giant databases we use, and performing joins and denormalizing aggs strictly for performance reasons (joining across a lot of stuff on query time is slow) and putting specified results into another database connected to the specified web server. Our Flink jobs are purely used for up-to-date materialized views. We don't care about historical analysis, we only care about what the exact current state of the world is.

This is why every row has a primary key, from beginning to end of the job (even though Flink's table api can't seem to detect that after a lot of joins in our plan, but it's logically true since then the join key will be pk). This is also why all we need to do is retract the current row from the Kafka source on the existing primary key that's being overwritten, have that retract propagate downstream to throw away any data transformed from that row, and then process the new row. We don't care what other data changes may have happened in between, it's not applicable to our use case.

We're using CDC for nothing more than a way to get the latest rows in real time into Kafka so they can be read by various Flink jobs we hope to build (starting with the one we're currently working on that has ~35 stateful operators) which then just transform and forward to another database.

----

Reading the Upsert Kafka docs [1] "In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values." This is how we thought the regular Kafka source actually worked, that it had state on PKs it could retract on, because we weren't even thinking of any other use case until it hit me that may not be true. Luckily the doc also provides an example of simply forwarding from DBZ Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't matter since now everything in the actual job reading from Upsert Kafka should function by PK like we need. On that note, I think it may be helpful to edit the documentation to indicate that if you need stateful PK based Kafka consumption it must be via Upsert Kafka.


Again, thanks for the thorough reply, this really helped my understanding!

On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on how well your application can deal with incomplete logs. Let's say your application is simply adding and subtracting retractions, you'd get -1. If your application is ignoring deletions without insertions (needs to be tracked for each person), you'd get 0. If your application is not looking at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application correctly. The correct result will only be achieved through the most complex application (aggregating state for each person and dealing with incomplete information). This is completely independent of Kafka, Debezium, or Flink.

---

However, as Jan pointed out: If you don't process data before compaction, then your application is correct. Now, then the question is what's the benefit of having data in the topic older than the compaction? The value is close to 0 as you can't really use it for CDC processing (again independent of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy and offload the data to s3 for historic (re)processing (afaik the cloud offering of confluent finally has automatic offloading but you can also build it yourself). Then you only need to ensure that your application is never accessing data that is deleted because of the retention time. In general, it's better to choose a technology such as Pulsar with tiered storage that gives you exactly what you want with low overhead: you need unlimited retention without compaction but without holding much data in expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me why you'd need compaction + a different retention for source/intermediate topics.

For the final topic, from my experience, a real key/value store works much better than log compacted topics for serving web applications. Confluent's marketing is strongly pushing that Kafka can be used as a database and as a key/value store while in reality, it's "just" a good distribution log. I can provide pointers that discuss the limitations if there is interest. Also note that the final topic should not be in CDC format anymore (so no retractions). It should just contain the current state. For both examples together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <[hidden email]> wrote:
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Rex Fenley
Thanks Arvid,

I think my confusion lies in misinterpreting the meaning of CDC. We basically don't want CDC, we just use it to get data into a compacted Kafka topic where we hold the current state of the world to consume from multiple consumers. You have described pretty thoroughly where we want to go.

One interesting part of your architecture is this "Debezium -> State collecting Flink job". Is there a way for Debezium to write to Flink? I thought it required Kafka Connect.

Appreciate your feedback

On Mon, Mar 1, 2021 at 12:43 AM Arvid Heise <[hidden email]> wrote:
> We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.
> I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data.
Why are you reading from a CDC topic and not a log-compacted topic that reflects the state then? CDC is all about history and changes.

What i'd imagine an architecture that would work better for you:

For each SQL table (ingress layer):
SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic (compacted)

Analytics (processing layer):
Kafka state topics (compacted) -> Analytical Flink job -> Kafka state topic (compacted)

For each view (egress layer):
Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s) -> Web application

The ingress layer is only there to provide you log-compacted Kafka topics. Then you can do a bunch of analytical queries from Kafka to Kafka. Finally, you output your views to K/V stores for high-avail web applications (=decoupled from processing layer).

If that's what you already have, then my apology for not picking that up. It's really important to stress that no Kafka topics ever contain CDC data in this instance since you are not interested in historic data. The only CDC exchange is by using the debezium connector of Flink. At this point, all discussions of this thread are resolved.



On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

>If you are not rereading the topics, why do you compact them?
We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.

>correctness depends on compaction < downtime
I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data.

>Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs
Is there a cloud-native key/value store which can read from a Postgres WAL or MySQL binlog and then keep an up-to-date read marker for any materialization consumers downstream besides Kafka + Debezium?

Appreciate all the feedback, though hopefully we can get closer to the same mental model. If there's really a better alternative here I'm all for it!


On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

Your initial question was about the impact of compaction on your CDC application logic. I have been (unsuccessfully) trying to tell you that you do not need compaction and it's counterproductive.

If you are not rereading the topics, why do you compact them? It's lost compute time and I/O on the Kafka brokers (which are both very valuable) and does not give you anything that an appropriate retention time wouldn't give you (=lower SSD usage). It makes the mental model more complicated. An aggressive compaction and a larger backlog (compaction time < application failure/restart/upgrade time) would lead to incorrect results (in the same way an inappropriate retention period may cause data loss for the same reason).

The only use case for log compaction is if you're using a Kafka topic for a key/value store to serve a web application (in which case, it's usually better to take a real key/value store) but then you don't need retractions anymore but you'd simply overwrite the actual values or use tombstone records for deletions.

If you consume the same topic both for web applications and Flink and don't want to use another technology for key/value store, then log compaction of retractions kinda makes sense to kill 2 birds with one stone. However, you have to live with the downsides on the Flink side (correctness depends on compaction < downtime) and on web application (deal with retractions even though they do not make any sense at that level). Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs and solve all issues on the Flink side (final note: it's independent of the technology, any stream processor will encounter the same issue as it's a conceptual mismatch).

On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

I really appreciate the thorough response but I don't think this contradicts our use case. In servicing web applications we're doing nothing more than taking data from giant databases we use, and performing joins and denormalizing aggs strictly for performance reasons (joining across a lot of stuff on query time is slow) and putting specified results into another database connected to the specified web server. Our Flink jobs are purely used for up-to-date materialized views. We don't care about historical analysis, we only care about what the exact current state of the world is.

This is why every row has a primary key, from beginning to end of the job (even though Flink's table api can't seem to detect that after a lot of joins in our plan, but it's logically true since then the join key will be pk). This is also why all we need to do is retract the current row from the Kafka source on the existing primary key that's being overwritten, have that retract propagate downstream to throw away any data transformed from that row, and then process the new row. We don't care what other data changes may have happened in between, it's not applicable to our use case.

We're using CDC for nothing more than a way to get the latest rows in real time into Kafka so they can be read by various Flink jobs we hope to build (starting with the one we're currently working on that has ~35 stateful operators) which then just transform and forward to another database.

----

Reading the Upsert Kafka docs [1] "In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values." This is how we thought the regular Kafka source actually worked, that it had state on PKs it could retract on, because we weren't even thinking of any other use case until it hit me that may not be true. Luckily the doc also provides an example of simply forwarding from DBZ Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't matter since now everything in the actual job reading from Upsert Kafka should function by PK like we need. On that note, I think it may be helpful to edit the documentation to indicate that if you need stateful PK based Kafka consumption it must be via Upsert Kafka.


Again, thanks for the thorough reply, this really helped my understanding!

On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on how well your application can deal with incomplete logs. Let's say your application is simply adding and subtracting retractions, you'd get -1. If your application is ignoring deletions without insertions (needs to be tracked for each person), you'd get 0. If your application is not looking at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application correctly. The correct result will only be achieved through the most complex application (aggregating state for each person and dealing with incomplete information). This is completely independent of Kafka, Debezium, or Flink.

---

However, as Jan pointed out: If you don't process data before compaction, then your application is correct. Now, then the question is what's the benefit of having data in the topic older than the compaction? The value is close to 0 as you can't really use it for CDC processing (again independent of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy and offload the data to s3 for historic (re)processing (afaik the cloud offering of confluent finally has automatic offloading but you can also build it yourself). Then you only need to ensure that your application is never accessing data that is deleted because of the retention time. In general, it's better to choose a technology such as Pulsar with tiered storage that gives you exactly what you want with low overhead: you need unlimited retention without compaction but without holding much data in expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me why you'd need compaction + a different retention for source/intermediate topics.

For the final topic, from my experience, a real key/value store works much better than log compacted topics for serving web applications. Confluent's marketing is strongly pushing that Kafka can be used as a database and as a key/value store while in reality, it's "just" a good distribution log. I can provide pointers that discuss the limitations if there is interest. Also note that the final topic should not be in CDC format anymore (so no retractions). It should just contain the current state. For both examples together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <[hidden email]> wrote:
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Does the Kafka source perform retractions on Key?

Arvid Heise-4
Hi Rex,

yes you can go directly into Flink since 1.11.0 [1], but afaik only through Table API/SQL currently (which you seem to be using anyways most of the time). I'd recommend using 1.11.1+ (some bugfixes) or even 1.12.0+ (many new useful features [2]). You can also check the main doc [3].

If you like more background, Marta talked about it on a higher level [4] (slides [5]) and Qingsheng and Jark on a lower level as well [6].


On Mon, Mar 1, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
Thanks Arvid,

I think my confusion lies in misinterpreting the meaning of CDC. We basically don't want CDC, we just use it to get data into a compacted Kafka topic where we hold the current state of the world to consume from multiple consumers. You have described pretty thoroughly where we want to go.

One interesting part of your architecture is this "Debezium -> State collecting Flink job". Is there a way for Debezium to write to Flink? I thought it required Kafka Connect.

Appreciate your feedback

On Mon, Mar 1, 2021 at 12:43 AM Arvid Heise <[hidden email]> wrote:
> We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.
> I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data.
Why are you reading from a CDC topic and not a log-compacted topic that reflects the state then? CDC is all about history and changes.

What i'd imagine an architecture that would work better for you:

For each SQL table (ingress layer):
SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic (compacted)

Analytics (processing layer):
Kafka state topics (compacted) -> Analytical Flink job -> Kafka state topic (compacted)

For each view (egress layer):
Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s) -> Web application

The ingress layer is only there to provide you log-compacted Kafka topics. Then you can do a bunch of analytical queries from Kafka to Kafka. Finally, you output your views to K/V stores for high-avail web applications (=decoupled from processing layer).

If that's what you already have, then my apology for not picking that up. It's really important to stress that no Kafka topics ever contain CDC data in this instance since you are not interested in historic data. The only CDC exchange is by using the debezium connector of Flink. At this point, all discussions of this thread are resolved.



On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

>If you are not rereading the topics, why do you compact them?
We are rereading the topics, at any time we might want a completely different materialized view for a different web service for some new application feature. Other jobs / new jobs need to read all the up-to-date rows from the databases.

>correctness depends on compaction < downtime
I still don't see how this is the case if everything just needs to be overwritten by primary key. To re-emphasize, we do not care about historical data.

>Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs
Is there a cloud-native key/value store which can read from a Postgres WAL or MySQL binlog and then keep an up-to-date read marker for any materialization consumers downstream besides Kafka + Debezium?

Appreciate all the feedback, though hopefully we can get closer to the same mental model. If there's really a better alternative here I'm all for it!


On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

Your initial question was about the impact of compaction on your CDC application logic. I have been (unsuccessfully) trying to tell you that you do not need compaction and it's counterproductive.

If you are not rereading the topics, why do you compact them? It's lost compute time and I/O on the Kafka brokers (which are both very valuable) and does not give you anything that an appropriate retention time wouldn't give you (=lower SSD usage). It makes the mental model more complicated. An aggressive compaction and a larger backlog (compaction time < application failure/restart/upgrade time) would lead to incorrect results (in the same way an inappropriate retention period may cause data loss for the same reason).

The only use case for log compaction is if you're using a Kafka topic for a key/value store to serve a web application (in which case, it's usually better to take a real key/value store) but then you don't need retractions anymore but you'd simply overwrite the actual values or use tombstone records for deletions.

If you consume the same topic both for web applications and Flink and don't want to use another technology for key/value store, then log compaction of retractions kinda makes sense to kill 2 birds with one stone. However, you have to live with the downsides on the Flink side (correctness depends on compaction < downtime) and on web application (deal with retractions even though they do not make any sense at that level). Again, a cloud-native key/value store would perform much better and be much cheaper with better SLAs and solve all issues on the Flink side (final note: it's independent of the technology, any stream processor will encounter the same issue as it's a conceptual mismatch).

On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <[hidden email]> wrote:
Hi Arvid,

I really appreciate the thorough response but I don't think this contradicts our use case. In servicing web applications we're doing nothing more than taking data from giant databases we use, and performing joins and denormalizing aggs strictly for performance reasons (joining across a lot of stuff on query time is slow) and putting specified results into another database connected to the specified web server. Our Flink jobs are purely used for up-to-date materialized views. We don't care about historical analysis, we only care about what the exact current state of the world is.

This is why every row has a primary key, from beginning to end of the job (even though Flink's table api can't seem to detect that after a lot of joins in our plan, but it's logically true since then the join key will be pk). This is also why all we need to do is retract the current row from the Kafka source on the existing primary key that's being overwritten, have that retract propagate downstream to throw away any data transformed from that row, and then process the new row. We don't care what other data changes may have happened in between, it's not applicable to our use case.

We're using CDC for nothing more than a way to get the latest rows in real time into Kafka so they can be read by various Flink jobs we hope to build (starting with the one we're currently working on that has ~35 stateful operators) which then just transform and forward to another database.

----

Reading the Upsert Kafka docs [1] "In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values." This is how we thought the regular Kafka source actually worked, that it had state on PKs it could retract on, because we weren't even thinking of any other use case until it hit me that may not be true. Luckily the doc also provides an example of simply forwarding from DBZ Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't matter since now everything in the actual job reading from Upsert Kafka should function by PK like we need. On that note, I think it may be helpful to edit the documentation to indicate that if you need stateful PK based Kafka consumption it must be via Upsert Kafka.


Again, thanks for the thorough reply, this really helped my understanding!

On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on how well your application can deal with incomplete logs. Let's say your application is simply adding and subtracting retractions, you'd get -1. If your application is ignoring deletions without insertions (needs to be tracked for each person), you'd get 0. If your application is not looking at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application correctly. The correct result will only be achieved through the most complex application (aggregating state for each person and dealing with incomplete information). This is completely independent of Kafka, Debezium, or Flink.

---

However, as Jan pointed out: If you don't process data before compaction, then your application is correct. Now, then the question is what's the benefit of having data in the topic older than the compaction? The value is close to 0 as you can't really use it for CDC processing (again independent of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy and offload the data to s3 for historic (re)processing (afaik the cloud offering of confluent finally has automatic offloading but you can also build it yourself). Then you only need to ensure that your application is never accessing data that is deleted because of the retention time. In general, it's better to choose a technology such as Pulsar with tiered storage that gives you exactly what you want with low overhead: you need unlimited retention without compaction but without holding much data in expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me why you'd need compaction + a different retention for source/intermediate topics.

For the final topic, from my experience, a real key/value store works much better than log compacted topics for serving web applications. Confluent's marketing is strongly pushing that Kafka can be used as a database and as a key/value store while in reality, it's "just" a good distribution log. I can provide pointers that discuss the limitations if there is interest. Also note that the final topic should not be in CDC format anymore (so no retractions). It should just contain the current state. For both examples together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <[hidden email]> wrote:
Digging around, it looks like Upsert Kafka which requires a Primary Key will actually do what I want and uses compaction, but it doesn't look compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still compacted) and then writing directly to an Upsert Kafka sink and then reading right back out of a corresponding Upsert Kafka source. Since that little roundabout will key all changes by primary key it should give us a compacted topic to start with initially. Once we get that working we can probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <[hidden email]> wrote:
Does this also imply that it's not safe to compact the initial topic where data is coming from Debezium? I'd think that Flink's Kafka source would emit retractions on any existing data with a primary key as new data with the same pk arrived (in our case all data has primary keys). I guess that goes back to my original question still however, is this not what the Kafka source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <[hidden email]> wrote:
Just to clarify, intermediate topics should in most cases not be compacted for exactly the reasons if your application depends on all intermediate data. For the final topic, it makes sense. If you also consume intermediate topics for web application, one solution is to split it into two topics (like topic-raw for Flink and topic-compacted for applications) and live with some amplification.

On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <[hidden email]> wrote:
All of our Flink jobs are (currently) used for web applications at the end of the day. We see a lot of latency spikes from record amplification and we were at first hoping we could pass intermediate results through Kafka and compact them to lower the record amplification, but then it hit me that this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to look for a different solution or only compact on records we know will never mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <[hidden email]> wrote:
Jan's response is correct, but I'd like to emphasize the impact on a Flink application.

If the compaction happens before the data arrives in Flink, the intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's coming from CDC and you want to process and produce changelog streams over several applications, you probably don't want to use log compactions anyways.

Log compaction only makes sense in the snapshot topic that displays the current state (KTable), where you don't think in CDC updates anymore but just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible for building its own current model by tapping in the relevant changes. Log compacted topics would then only appear at the end of processing, when you hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <[hidden email]> wrote:

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka source in the case of compacted topic, right? If that is the case, then this is not directly related to Flink, Flink will expose the behavior defined by Kafka. You can read about it for instance here [1]. TL;TD - your pipeline is guaranteed to see every record written to topic (every single update, be it later "overwritten" or not) if it processes the record with latency at most 'delete.retention.ms'. This is configurable per topic - default 24 hours. If you want to reprocess the data later, your consumer might see only resulting compacted ("retracted") stream, and not every record actually written to the topic.

 Jan

[1] https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own retractions of old data on key (user_id) for every append it receives, it should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <[hidden email]> wrote:
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california" and reads from the Kafka stream, I assume it will miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use compaction since all our jobs are based on CDC and we can't just drop data after x number of days.

Thanks

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US