http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Distribute-DataSet-to-subset-of-nodes-tp2814p2866.html
Hi Fabian,
the local file problem would however not exist, if I just copy both halves to all nodes, right?
Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:
// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")
val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, "fileName").setParallelism(5)
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, "fileName").setParallelism(5)
DataSet result = mapped1.union(mapped2)
Then, in my custom operator implementation of flatMap I check the helper broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
}
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input
out.collect(this.data.process(input))
}
I think this should work, or do you see another problem here?
Which brings us to the other question:
The both halves are so large, that one half of the data fits in the user-remaining memory on a node, but not both halves. So my program would probably memory-crash, if the scheduling trusts one node so much, that it wants to execute two flatMaps there ;-).
You are saying, that it is not guaranteed, that all 10 nodes are used, but how likely is it, that one node is given two flatMaps and another one is basically idling? I have no idea of the internals, but I guess there is some heuristic inside which decides how to distribute.In the normal setup that all 10 nodes are up, connection is good, all nodes have the same resources available, input data is evenly distributed in HDFS, then the default case should be to distribute to all 10 nodes, right?
I am not running in production, so for me it would be ok, if this works out usually.
Cheers
Stefan