Hi, I saw Apache Flink User Mailing List archive. - static/dynamic lookups in flink streaming being discussed, and then I saw this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API. I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. This is what a map operator would look like with lookup: -> Load in-memory lookup - Refresh timer start -> Stream processing start -> Call lookup -> Use lookup result in Stream processing -> Timer elapsed -> Reload lookup data source into in-memory table -> Continue processing My concern around these are : 1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case). 2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread) Am i thinking in the right direction? Or missing something very obvious? It confusing. Any leads are much appreciated. Thanks in advance. Cheers, Chirag |
Hi.
We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream.
Med venlig hilsen / Best regards Lasse Nedergaard
|
Hi Lasse,
One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data. This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B. You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded. The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly. — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Thanks Lasse, that is rightly put. That's the only solution I can think of too. Only thing which I can't get my head around is using the coMap and coFlatMap functions with such a stream. Since they dont support side outputs, is there a way my lookup map/flatmap function simply consume a stream? Ken, thats an interesting solution actually. Is there any chance you need to update the memory-loaded data too? Thanks, Chirag On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <[hidden email]> wrote:
Hi Lasse, One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data. This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B. You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded. The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly. — Ken
Hi. We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. Med venlig hilsen / Best regards Lasse Nedergaard
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi Chirag, Flink 1.5.0 added support for BroadcastState which should address your requirement of replicating the data. [1] The replicated data is stored in the configured state backend which can also be RocksDB. Regarding the reload, I would recommend Lasse's approach of having a custom source that pushes data in regular intervals instead. One problem is that it is not possible to pause a stream until all data is loaded. Instread, you would need to buffer that data in state as well and work with start and end markers on the broadcast stream. Best, Fabian Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <[hidden email]>:
|
Hi Chirag, The community is also looking at an approach that involves using Bravo[1][2] to bootstrap state by loading the initial version of the state into a savepoint. On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske <[hidden email]> wrote:
David Anderson | Training Coordinator | data Artisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time |
Thanks a lot, David and Fabian. I will give this a try. Cheers, Chirag On Monday, 1 October, 2018, 3:48:42 PM IST, David Anderson <[hidden email]> wrote:
Hi Chirag, The community is also looking at an approach that involves using Bravo[1][2] to bootstrap state by loading the initial version of the state into a savepoint. On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske <[hidden email]> wrote:
David Anderson | Training Coordinator | data Artisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time |
Free forum by Nabble | Edit this page |