In-Memory Lookup in Flink Operators

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

In-Memory Lookup in Flink Operators

chiggi_dev
Hi,


I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 

I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 

This is what a map operator would look like with lookup: 

-> Load in-memory lookup - Refresh timer start
-> Stream processing start
-> Call lookup
-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table
-> Continue processing


 My concern around these are : 

1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).
2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)

Am i thinking in the right direction? Or missing something very obvious? It confusing.

Any leads are much appreciated. Thanks in advance.

Cheers, 
Chirag
Reply | Threaded
Open this post in threaded view
|

Re: In-Memory Lookup in Flink Operators

Lasse Nedergaard
Hi. 

We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. 
To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <[hidden email]>:

Hi,


I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 

I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 

This is what a map operator would look like with lookup: 

-> Load in-memory lookup - Refresh timer start
-> Stream processing start
-> Call lookup
-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table
-> Continue processing


 My concern around these are : 

1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).
2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)

Am i thinking in the right direction? Or missing something very obvious? It confusing.

Any leads are much appreciated. Thanks in advance.

Cheers, 
Chirag
Reply | Threaded
Open this post in threaded view
|

Re: In-Memory Lookup in Flink Operators

Ken Krugler
Hi Lasse,

One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data.

This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B.

You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded.

The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly.

— Ken


On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <[hidden email]> wrote:

Hi. 

We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. 
To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <[hidden email]>:

Hi,


I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 

I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 

This is what a map operator would look like with lookup: 

-> Load in-memory lookup - Refresh timer start
-> Stream processing start
-> Call lookup
-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table
-> Continue processing


 My concern around these are : 

1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).
2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)

Am i thinking in the right direction? Or missing something very obvious? It confusing.

Any leads are much appreciated. Thanks in advance.

Cheers, 
Chirag

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: In-Memory Lookup in Flink Operators

chiggi_dev
Thanks Lasse, that is rightly put. That's the only solution I can think of too.

Only thing which I can't get my head around is using the coMap and coFlatMap functions with such a stream. Since they dont support side outputs, is there a way my lookup map/flatmap function simply consume a stream? 

Ken, thats an interesting solution actually. Is there any chance you need to update the memory-loaded data too? 

Thanks,

Chirag

On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <[hidden email]> wrote:


Hi Lasse,

One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data.

This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B.

You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded.

The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly.

— Ken


On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <[hidden email]> wrote:

Hi. 

We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. 
To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <[hidden email]>:

Hi,


I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 

I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 

This is what a map operator would look like with lookup: 

-> Load in-memory lookup - Refresh timer start
-> Stream processing start
-> Call lookup
-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table
-> Continue processing


 My concern around these are : 

1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).
2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)

Am i thinking in the right direction? Or missing something very obvious? It confusing.

Any leads are much appreciated. Thanks in advance.

Cheers, 
Chirag

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: In-Memory Lookup in Flink Operators

Fabian Hueske-2
Hi Chirag,

Flink 1.5.0 added support for BroadcastState which should address your requirement of replicating the data.  [1]
The replicated data is stored in the configured state backend which can also be RocksDB.

Regarding the reload, I would recommend Lasse's approach of having a custom source that pushes data in regular intervals instead.
One problem is that it is not possible to pause a stream until all data is loaded. Instread, you would need to buffer that data in state as well and work with start and end markers on the broadcast stream.

Best, Fabian



Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <[hidden email]>:
Thanks Lasse, that is rightly put. That's the only solution I can think of too.

Only thing which I can't get my head around is using the coMap and coFlatMap functions with such a stream. Since they dont support side outputs, is there a way my lookup map/flatmap function simply consume a stream? 

Ken, thats an interesting solution actually. Is there any chance you need to update the memory-loaded data too? 

Thanks,

Chirag

On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <[hidden email]> wrote:


Hi Lasse,

One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data.

This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B.

You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded.

The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly.

— Ken


On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <[hidden email]> wrote:

Hi. 

We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. 
To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <[hidden email]>:

Hi,


I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 

I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 

This is what a map operator would look like with lookup: 

-> Load in-memory lookup - Refresh timer start
-> Stream processing start
-> Call lookup
-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table
-> Continue processing


 My concern around these are : 

1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).
2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)

Am i thinking in the right direction? Or missing something very obvious? It confusing.

Any leads are much appreciated. Thanks in advance.

Cheers, 
Chirag

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: In-Memory Lookup in Flink Operators

alpinegizmo
Hi Chirag,

The community is also looking at an approach that involves using Bravo[1][2] to bootstrap state by loading the initial version of the state into a savepoint.


On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske <[hidden email]> wrote:
Hi Chirag,

Flink 1.5.0 added support for BroadcastState which should address your requirement of replicating the data.  [1]
The replicated data is stored in the configured state backend which can also be RocksDB.

Regarding the reload, I would recommend Lasse's approach of having a custom source that pushes data in regular intervals instead.
One problem is that it is not possible to pause a stream until all data is loaded. Instread, you would need to buffer that data in state as well and work with start and end markers on the broadcast stream.

Best, Fabian



Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <[hidden email]>:
Thanks Lasse, that is rightly put. That's the only solution I can think of too.

Only thing which I can't get my head around is using the coMap and coFlatMap functions with such a stream. Since they dont support side outputs, is there a way my lookup map/flatmap function simply consume a stream? 

Ken, thats an interesting solution actually. Is there any chance you need to update the memory-loaded data too? 

Thanks,

Chirag

On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <[hidden email]> wrote:


Hi Lasse,

One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data.

This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B.

You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded.

The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly.

— Ken


On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <[hidden email]> wrote:

Hi. 

We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. 
To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <[hidden email]>:

Hi,


I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 

I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 

This is what a map operator would look like with lookup: 

-> Load in-memory lookup - Refresh timer start
-> Stream processing start
-> Call lookup
-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table
-> Continue processing


 My concern around these are : 

1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).
2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)

Am i thinking in the right direction? Or missing something very obvious? It confusing.

Any leads are much appreciated. Thanks in advance.

Cheers, 
Chirag

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--
David Anderson | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
Reply | Threaded
Open this post in threaded view
|

Re: In-Memory Lookup in Flink Operators

chiggi_dev
Thanks a lot, David and Fabian.

I will give this a try.

Cheers,
Chirag

On Monday, 1 October, 2018, 3:48:42 PM IST, David Anderson <[hidden email]> wrote:


Hi Chirag,

The community is also looking at an approach that involves using Bravo[1][2] to bootstrap state by loading the initial version of the state into a savepoint.


On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske <[hidden email]> wrote:
Hi Chirag,

Flink 1.5.0 added support for BroadcastState which should address your requirement of replicating the data.  [1]
The replicated data is stored in the configured state backend which can also be RocksDB.

Regarding the reload, I would recommend Lasse's approach of having a custom source that pushes data in regular intervals instead.
One problem is that it is not possible to pause a stream until all data is loaded. Instread, you would need to buffer that data in state as well and work with start and end markers on the broadcast stream.

Best, Fabian



Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <[hidden email]>:
Thanks Lasse, that is rightly put. That's the only solution I can think of too.

Only thing which I can't get my head around is using the coMap and coFlatMap functions with such a stream. Since they dont support side outputs, is there a way my lookup map/flatmap function simply consume a stream? 

Ken, thats an interesting solution actually. Is there any chance you need to update the memory-loaded data too? 

Thanks,

Chirag

On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <[hidden email]> wrote:


Hi Lasse,

One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data.

This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B.

You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded.

The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly.

— Ken


On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <[hidden email]> wrote:

Hi. 

We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. 
To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <[hidden email]>:

Hi,


I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 

I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 

This is what a map operator would look like with lookup: 

-> Load in-memory lookup - Refresh timer start
-> Stream processing start
-> Call lookup
-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table
-> Continue processing


 My concern around these are : 

1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).
2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)

Am i thinking in the right direction? Or missing something very obvious? It confusing.

Any leads are much appreciated. Thanks in advance.

Cheers, 
Chirag

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--
David Anderson | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time