Hello,
We're using Flink on a high velocity data-stream, and we're looking for the best way to enrich our stream using a large static source (originating from Parquet files, which are rarely updated). The source for the enrichment weights a few GBs, which is why we want to avoid using techniques such as broadcast streams, which cannot be keyed and need to be duplicated for every Flink operator that is used. We started looking into the possibility of merging streams with datasets, or using the Table API, but any best-practice that's been done before will be greatly appreciated. I'm attaching a simple chart for convenience, Thanks you very much, Nimrod. flink enrichment flow.png (181K) Download Attachment |
Hi Nimrod,
One approach is as follows… 1. For the Parquet data, you could use a ContinuousFileMonitoringFunction to generate a stream of enrichment records. 2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and Parquet), and output something like Tuple2<key, Either<Parquet, Kafka>> 3. In your enrichment function, based on what’s in the Either<> you’re either updating your enrichment state, or processing a record from Kafka. But I think you might want to add a stateful function in the Parquet stream, so that you can separately track Kafka record state in the enrichment function. There’s also the potential issue of wanting to buffer Kafka data in the enrichment function, if you need to coordinate with the enrichment data (e.g. you need to get a complete set of updated enrichment data before applying any of it to the incoming Kafka data). — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hey Ken, Thank you for your quick response! That definitely sounds like something worth exploring. Just a few more small questions, if that's ok. 1. You referred to the parquet source as a "stream", but what we have is a static data-source which we will always want to "query" against . What we thought about doing is to stream the entire parquet dataset and load it into our state. Does that sound right, or is that "hacky"? 2. Can the continuousFileMonitoringFunction be used to track an entire directory of parquet files? Also, we'd like it to refresh its' state (= its' internal data structures) every time the parquet folder is updated, but only after all new files have been written (meaning, we'll need it to run once an update has been detected, but not right away) Is that a reasonable use-case? And thank you once again. Nimrod. On Sat, Jan 26, 2019 at 7:10 PM Ken Krugler <[hidden email]> wrote:
|
If so, then yes you can do that, but you’d have to obviously re-load it yourself, and partition it (since it’s also keyed, right?) yourself, etc.
You can write a custom SourceFunction, but where it gets tricky is handling failure recovery (checkpointing). But I should also have mentioned the fundamental issue with this kind of enrichment in Flink - you can’t control the ordering of the two streams (easily), so you have to be prepared to buffer data from Kafka until you’ve got a complete set of data from Parquet. We’d worked around a similar issue with a UnionedSources source function, but I haven’t validated that it handles checkpointing correctly. — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Free forum by Nabble | Edit this page |