Best pattern for achieving stream enrichment (side-input) from a large static source

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Best pattern for achieving stream enrichment (side-input) from a large static source

Nimrod Hauser
Hello,

We're using Flink on a high velocity data-stream, and we're looking for the best way to enrich our stream using a large static source (originating from Parquet files, which are rarely updated).

The source for the enrichment weights a few GBs, which is why we want to avoid using techniques such as broadcast streams, which cannot be keyed and need to be duplicated for every Flink operator that is used.

We started looking into the possibility of merging streams with datasets, or using the Table API, but any best-practice that's been done before will be greatly appreciated.

I'm attaching a simple chart for convenience,

Thanks you very much,

Nimrod.

flink enrichment flow.png (181K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Best pattern for achieving stream enrichment (side-input) from a large static source

Ken Krugler
Hi Nimrod,

One approach is as follows…

1. For the Parquet data, you could use a ContinuousFileMonitoringFunction to generate a stream of enrichment records.

2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and Parquet), and output something like Tuple2<key, Either<Parquet, Kafka>>

3. In your enrichment function, based on what’s in the Either<> you’re either updating your enrichment state, or processing a record from Kafka.

But I think you might want to add a stateful function in the Parquet stream, so that you can separately track Kafka record state in the enrichment function.

There’s also the potential issue of wanting to buffer Kafka data in the enrichment function, if you need to coordinate with the enrichment data (e.g. you need to get a complete set of updated enrichment data before applying any of it to the incoming Kafka data).

— Ken



On Jan 26, 2019, at 8:04 AM, Nimrod Hauser <[hidden email]> wrote:

Hello,

We're using Flink on a high velocity data-stream, and we're looking for the best way to enrich our stream using a large static source (originating from Parquet files, which are rarely updated).

The source for the enrichment weights a few GBs, which is why we want to avoid using techniques such as broadcast streams, which cannot be keyed and need to be duplicated for every Flink operator that is used.

We started looking into the possibility of merging streams with datasets, or using the Table API, but any best-practice that's been done before will be greatly appreciated.

I'm attaching a simple chart for convenience,

Thanks you very much,

Nimrod.
<flink enrichment flow.png>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Best pattern for achieving stream enrichment (side-input) from a large static source

Nimrod Hauser
Hey Ken, 

Thank you for your quick response! That definitely sounds like something worth exploring.
Just a few more small questions, if that's ok.

1. You referred to the parquet source as a "stream", but what we have is a static data-source which we will always want to "query" against .
    What we thought about doing is to stream the entire parquet dataset and load it into our state. 
    Does that sound right, or is that "hacky"?

2. Can the continuousFileMonitoringFunction be used to track an entire directory of parquet files? Also, we'd like it to refresh its' state (= its' internal data structures) every time the parquet folder is updated, but only after all new files have been written (meaning, we'll need it to run once an update has been detected, but not right away)
Is that a reasonable use-case?

And thank you once again.

Nimrod.

On Sat, Jan 26, 2019 at 7:10 PM Ken Krugler <[hidden email]> wrote:
Hi Nimrod,

One approach is as follows…

1. For the Parquet data, you could use a ContinuousFileMonitoringFunction to generate a stream of enrichment records.

2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and Parquet), and output something like Tuple2<key, Either<Parquet, Kafka>>

3. In your enrichment function, based on what’s in the Either<> you’re either updating your enrichment state, or processing a record from Kafka.

But I think you might want to add a stateful function in the Parquet stream, so that you can separately track Kafka record state in the enrichment function.

There’s also the potential issue of wanting to buffer Kafka data in the enrichment function, if you need to coordinate with the enrichment data (e.g. you need to get a complete set of updated enrichment data before applying any of it to the incoming Kafka data).

— Ken



On Jan 26, 2019, at 8:04 AM, Nimrod Hauser <[hidden email]> wrote:

Hello,

We're using Flink on a high velocity data-stream, and we're looking for the best way to enrich our stream using a large static source (originating from Parquet files, which are rarely updated).

The source for the enrichment weights a few GBs, which is why we want to avoid using techniques such as broadcast streams, which cannot be keyed and need to be duplicated for every Flink operator that is used.

We started looking into the possibility of merging streams with datasets, or using the Table API, but any best-practice that's been done before will be greatly appreciated.

I'm attaching a simple chart for convenience,

Thanks you very much,

Nimrod.
<flink enrichment flow.png>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Best pattern for achieving stream enrichment (side-input) from a large static source

Ken Krugler

On Jan 26, 2019, at 1:08 PM, Nimrod Hauser <[hidden email]> wrote:

Hey Ken, 

Thank you for your quick response! That definitely sounds like something worth exploring.
Just a few more small questions, if that's ok.

1. You referred to the parquet source as a "stream", but what we have is a static data-source which we will always want to "query" against .
    What we thought about doing is to stream the entire parquet dataset and load it into our state. 
    Does that sound right, or is that "hacky”?

Not sure what you mean by “stream the entire parquet dataset”. Do you mean you’d load it into memory yourself, and then distribute it?

If so, then yes you can do that, but you’d have to obviously re-load it yourself, and partition it (since it’s also keyed, right?) yourself, etc.

2. Can the continuousFileMonitoringFunction be used to track an entire directory of parquet files?

Yes.

Also, we'd like it to refresh its' state (= its' internal data structures) every time the parquet folder is updated, but only after all new files have been written (meaning, we'll need it to run once an update has been detected, but not right away)
Is that a reasonable use-case?

It’s a reasonable use case, but it precludes using the ContinuousFileMonitoringFunction.

You can write a custom SourceFunction, but where it gets tricky is handling failure recovery (checkpointing).

But I should also have mentioned the fundamental issue with this kind of enrichment in Flink - you can’t control the ordering of the two streams (easily), so you have to be prepared to buffer data from Kafka until you’ve got a complete set of data from Parquet.

We’d worked around a similar issue with a UnionedSources source function, but I haven’t validated that it handles checkpointing correctly.

— Ken

 

And thank you once again.

Nimrod.

On Sat, Jan 26, 2019 at 7:10 PM Ken Krugler <[hidden email]> wrote:
Hi Nimrod,

One approach is as follows…

1. For the Parquet data, you could use a ContinuousFileMonitoringFunction to generate a stream of enrichment records.

2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and Parquet), and output something like Tuple2<key, Either<Parquet, Kafka>>

3. In your enrichment function, based on what’s in the Either<> you’re either updating your enrichment state, or processing a record from Kafka.

But I think you might want to add a stateful function in the Parquet stream, so that you can separately track Kafka record state in the enrichment function.

There’s also the potential issue of wanting to buffer Kafka data in the enrichment function, if you need to coordinate with the enrichment data (e.g. you need to get a complete set of updated enrichment data before applying any of it to the incoming Kafka data).

— Ken



On Jan 26, 2019, at 8:04 AM, Nimrod Hauser <[hidden email]> wrote:

Hello,

We're using Flink on a high velocity data-stream, and we're looking for the best way to enrich our stream using a large static source (originating from Parquet files, which are rarely updated).

The source for the enrichment weights a few GBs, which is why we want to avoid using techniques such as broadcast streams, which cannot be keyed and need to be duplicated for every Flink operator that is used.

We started looking into the possibility of merging streams with datasets, or using the Table API, but any best-practice that's been done before will be greatly appreciated.

I'm attaching a simple chart for convenience,

Thanks you very much,

Nimrod.
<flink enrichment flow.png>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra