Fastest way for decent lookup JOIN?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fastest way for decent lookup JOIN?

Theo
Hi there,

I have the following (probably very common) usecase: I have some lookup data ( 100 million records ) which change only slowly (in the range of some thousands per day). My event stream is in the order of tens of billions events per day and each event needs to be enriched from the 100 million lookup source. For the JOIN, I don't need any event time related stuff, just the newest version at the time of enrichment shall be taken into account.

As a user used to the DataStream API but unfamiliar with SQL API, I built a small MVP. I used a connected stream and put the enrichment data into keyed (heap)state. My RAM is enough to hold all the data in memory (once in prod at least). I first streamed in all 100 million records, then I started the performance measurement by streaming in just 3 million events to be enriched against the 100 million records. I was a bit stunned that the enrichment of all events took about 40 seconds on my local machine. I built up a similar MVP in Spark where I put the 100 million records into a (pre-partioned to the JOIN column) hive table, the 3 million test events into a parquetfile and then run an outer join which also took about 40 seconds on my local machine (consuming only 16GB of RAM). I somehow expected Flink to be much faster as I hold the enrichment data already in memory (state) and at least on the localhost, there is no real networking involved.

I then thought about the problems with the DataStream API: My 100 million events are read from an uncompressed CSV file which is 25GB in size. Deserialized to Java POJOs, I guess the POJOs would take 100GB heap space. [Actually, I run the tests in Spark with all 100million records and this Flink test with only 20 Million records due to too much memory used, so the 100GB is an estimation from 20 million records taking 20GB heap space]. When I stopped parsing my enrichment data to POJOs but extracted only the enrichment (join) attribute and kept the remaining part of the data as a simple string, the java heap taken was only about 25GB again for all 100million records. Not only that, my enrichment JOIN now took only 30 seconds to complete all records. My thought now is: I probably shouldn't use DataStream API with Java POJOs here, but Flink SQL API with "Row" classes? I remember I once read some blog with how Flink internally optimizes its data strucutres and can reuse certain stuff when using SQL API and so on.

Before I am going to try out several variants now, my question is: What do you think is the fastest/most efficient way to enrich slowly changing data with the latest version (Processing time temporal table JOIN) [When memory isn't a big problem once deployed to the cluster]? Do you recommend to use the SQL API? With which type of JOIN? (Processing time temporal table?) and hold enrichment table fully in Flink managed memory (Can I express this via SQL API?) or do I need to use some external "LookupTableSource"? Once I run my application in the cluster, I suspect a "LookupTableSource" to introduce some communication overhead vs. querying Flink State directly? If you recommend DataStream API to be used: Should I read via SQL connectors and work with "Rows" in state? What kind of performance tunings should I take into account here (reuseObjects, disableChaining, ...)?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: Fastest way for decent lookup JOIN?

rmetzger0
Hi Theo,

Since you are running Flink locally it would be quite easy to attach a profiler to Flink to see where most of the CPU cycles are burned (or: check if you are maybe IO bound?) .. this could provide us with valuable data on deciding for the next steps.

On Tue, May 18, 2021 at 5:26 PM Theo Diefenthal <[hidden email]> wrote:
Hi there,

I have the following (probably very common) usecase: I have some lookup data ( 100 million records ) which change only slowly (in the range of some thousands per day). My event stream is in the order of tens of billions events per day and each event needs to be enriched from the 100 million lookup source. For the JOIN, I don't need any event time related stuff, just the newest version at the time of enrichment shall be taken into account.

As a user used to the DataStream API but unfamiliar with SQL API, I built a small MVP. I used a connected stream and put the enrichment data into keyed (heap)state. My RAM is enough to hold all the data in memory (once in prod at least). I first streamed in all 100 million records, then I started the performance measurement by streaming in just 3 million events to be enriched against the 100 million records. I was a bit stunned that the enrichment of all events took about 40 seconds on my local machine. I built up a similar MVP in Spark where I put the 100 million records into a (pre-partioned to the JOIN column) hive table, the 3 million test events into a parquetfile and then run an outer join which also took about 40 seconds on my local machine (consuming only 16GB of RAM). I somehow expected Flink to be much faster as I hold the enrichment data already in memory (state) and at least on the localhost, there is no real networking involved.

I then thought about the problems with the DataStream API: My 100 million events are read from an uncompressed CSV file which is 25GB in size. Deserialized to Java POJOs, I guess the POJOs would take 100GB heap space. [Actually, I run the tests in Spark with all 100million records and this Flink test with only 20 Million records due to too much memory used, so the 100GB is an estimation from 20 million records taking 20GB heap space]. When I stopped parsing my enrichment data to POJOs but extracted only the enrichment (join) attribute and kept the remaining part of the data as a simple string, the java heap taken was only about 25GB again for all 100million records. Not only that, my enrichment JOIN now took only 30 seconds to complete all records. My thought now is: I probably shouldn't use DataStream API with Java POJOs here, but Flink SQL API with "Row" classes? I remember I once read some blog with how Flink internally optimizes its data strucutres and can reuse certain stuff when using SQL API and so on.

Before I am going to try out several variants now, my question is: What do you think is the fastest/most efficient way to enrich slowly changing data with the latest version (Processing time temporal table JOIN) [When memory isn't a big problem once deployed to the cluster]? Do you recommend to use the SQL API? With which type of JOIN? (Processing time temporal table?) and hold enrichment table fully in Flink managed memory (Can I express this via SQL API?) or do I need to use some external "LookupTableSource"? Once I run my application in the cluster, I suspect a "LookupTableSource" to introduce some communication overhead vs. querying Flink State directly? If you recommend DataStream API to be used: Should I read via SQL connectors and work with "Rows" in state? What kind of performance tunings should I take into account here (reuseObjects, disableChaining, ...)?

Best regards
Theo