Hi Squirrels,
I have some trouble expressing my use case in Flink terms, so I am asking for your help: I have five million documents and fourteen million search terms. For each search term I want to know, in how many documents it occurs. So basically a `grep` with very many search terms. My current approach is to broadcast the search terms to all nodes, then, at each node, load them into a prefix tree/trie, which allows me to do the contains check faster, and then process a share of the documents at each node. However, not all search terms fit into the main memory at each node (I already set taskmanager.memory.fraction to 0.5), so I basically need to run three batches with one third of the search terms each. Questions: (1): Is there a better way to express this in Flink? Distributing the nine million search terms (around 300 MB) at a time to ten nodes does not seem like the best idea. Also, the documentation says "As the content of broadcast variables is kept in-memory on each node, it should not become too large", however a simple join would be too big, and not allow me to use a tree structure. (2): Is there a way to execute the three batches inside one Flink program, one after another? Currently, I need to run my program three times with a different parameter, download from hdfs, concat, upload to hdfs ... to get my final result. The three batches may not run in parallel, as I need the available memory for the tree and get memory exceptions otherwise. (3) [May be obsolete, if there is a better solution in (1)]: In the broadcast scenario, I guess (?) I end up storing the search terms twice. Once in the broadcast data set and once in my tree data structure. Is there a way, to free the broadcast store on each node, after I copied it in my data structure? This would free some more memory. Thank you for any help, Stefan |
Hi Stefan,
Flink uses only one broadcast variable for all parallel tasks on one machine. Flink can also load the broadcast variable into a custom data structure. Have a look at the getBroadcastVariableWithInitializer() method: * Returns the result bound to the broadcast variable identified by the * given {@code name}. The broadcast variable is returned as a shared data structure * that is initialized with the given {@link BroadcastVariableInitializer}. * <p> * IMPORTANT: The broadcast variable data structure is shared between the parallel * tasks on one machine. Any access that modifies its internal state needs to * be manually synchronized by the caller. * * @param name The name under which the broadcast variable is registered; * @param initializer The initializer that creates the shared data structure of the broadcast * variable from the sequence of elements. * @return The broadcast variable, materialized as a list of elements. */ <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer); Right now, there is no easy way to run multiple tasks one after the other that I am aware of. However, we are working on materializing intermediate results. Once this feature is available, it should be easy to do the grep steps one by one. Cheers, Fabian |
Concerning your question how to run the programs one after another: In the core method of the program, you can simply have a loop around the part between "getExecutionEnvironment()" and "env.execute()". That way, you trigger the programs one after another. On Wed, Feb 4, 2015 at 9:34 PM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |