Expressing `grep` with many search terms in Flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Expressing `grep` with many search terms in Flink

Stefan Bunk
Hi Squirrels,

I have some trouble expressing my use case in Flink terms, so I am asking for your help:
I have five million documents and fourteen million search terms. For each search term I want to know, in how many documents it occurs. So basically a `grep` with very many search terms.

My current approach is to broadcast the search terms to all nodes, then, at each node, load them into a prefix tree/trie, which allows me to do the contains check faster, and then process a share of the documents at each node.
However, not all search terms fit into the main memory at each node (I already set taskmanager.memory.fraction to 0.5), so I basically need to run three batches with one third of the search terms each.

Questions:
(1): Is there a better way to express this in Flink? Distributing the nine million search terms (around 300 MB) at a time to ten nodes does not seem like the best idea. Also, the documentation says "As the content of broadcast variables is kept in-memory on each node, it should not become too large", however a simple join would be too big, and not allow me to use a tree structure.
(2): Is there a way to execute the three batches inside one Flink program, one after another? Currently, I need to run my program three times with a different parameter, download from hdfs, concat, upload to hdfs ... to get my final result. The three batches may not run in parallel, as I need the available memory for the tree and get memory exceptions otherwise.
(3) [May be obsolete, if there is a better solution in (1)]: In the broadcast scenario, I guess (?) I end up storing the search terms twice. Once in the broadcast data set and once in my tree data structure. Is there a way, to free the broadcast store on each node, after I copied it in my data structure? This would free some more memory.

Thank you for any help,
Stefan
Reply | Threaded
Open this post in threaded view
|

Re: Expressing `grep` with many search terms in Flink

Fabian Hueske-2
Hi Stefan,

Flink uses only one broadcast variable for all parallel tasks on one machine. 
Flink can also load the broadcast variable into a custom data structure.

Have a look at the getBroadcastVariableWithInitializer() method:

/**
 * Returns the result bound to the broadcast variable identified by the
 * given {@code name}. The broadcast variable is returned as a shared data structure
 * that is initialized with the given {@link BroadcastVariableInitializer}.
 * <p>
 * IMPORTANT: The broadcast variable data structure is shared between the parallel
 *            tasks on one machine. Any access that modifies its internal state needs to
 *            be manually synchronized by the caller.
 *
 * @param name The name under which the broadcast variable is registered;
 * @param initializer The initializer that creates the shared data structure of the broadcast
 *                    variable from the sequence of elements.
 * @return The broadcast variable, materialized as a list of elements.
 */
<T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer);

Right now, there is no easy way to run multiple tasks one after the other that I am aware of.
However, we are working on materializing intermediate results. Once this feature is available, it should be easy to do the grep steps one by one.

Cheers, Fabian
Reply | Threaded
Open this post in threaded view
|

Re: Expressing `grep` with many search terms in Flink

Stephan Ewen
Concerning your question how to run the programs one after another:

In the core method of the program, you can simply have a loop around the part between "getExecutionEnvironment()" and "env.execute()". That way, you trigger the programs one after another.



On Wed, Feb 4, 2015 at 9:34 PM, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

Flink uses only one broadcast variable for all parallel tasks on one machine. 
Flink can also load the broadcast variable into a custom data structure.

Have a look at the getBroadcastVariableWithInitializer() method:

/**
 * Returns the result bound to the broadcast variable identified by the
 * given {@code name}. The broadcast variable is returned as a shared data structure
 * that is initialized with the given {@link BroadcastVariableInitializer}.
 * <p>
 * IMPORTANT: The broadcast variable data structure is shared between the parallel
 *            tasks on one machine. Any access that modifies its internal state needs to
 *            be manually synchronized by the caller.
 *
 * @param name The name under which the broadcast variable is registered;
 * @param initializer The initializer that creates the shared data structure of the broadcast
 *                    variable from the sequence of elements.
 * @return The broadcast variable, materialized as a list of elements.
 */
<T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer);

Right now, there is no easy way to run multiple tasks one after the other that I am aware of.
However, we are working on materializing intermediate results. Once this feature is available, it should be easy to do the grep steps one by one.

Cheers, Fabian