Distribute DataSet to subset of nodes

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Distribute DataSet to subset of nodes

Stefan Bunk
Hi!

Following problem: I have 10 nodes on which I want to execute a flatMap operator on a DataSet. In the open method of the operator, some data is read from disk and preprocessed, which is necessary for the operator. Problem is, the data does not fit in memory on one node, however, half of the data does.
So in five out of ten nodes, I stored one half of the data to be read in the open method, and the other half on the other five nodes.

Now my question: How can I distribute my DataSet, so that each element is sent once to a node with the first half of my data and once to a node with the other half?

I looked at implementing a custom partitioner, however my problems were:
(i) I have no mapping from the number I am supposed to return to the nodes to the data. How do I know, that index 5 contains one half of the data, and index 6 the other half?
(ii) I do not know the current index. Obviously, I want to send my DataSet element only once over the network. 

Best,
Stefan
Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Sachin Goel

Hi Stefan
Just a clarification : The output corresponding to an element based on the whole data will be a union of the outputs based on the two halves. Is this what you're trying to achieve? [It appears  like that since every flatMap task will independently produce outputs.]

In that case, one solution could be to simply have two flatMap operations based on parts of the *broadcast* data set, and take a union.

Cheers
Sachin

On Sep 13, 2015 7:04 PM, "Stefan Bunk" <[hidden email]> wrote:
Hi!

Following problem: I have 10 nodes on which I want to execute a flatMap operator on a DataSet. In the open method of the operator, some data is read from disk and preprocessed, which is necessary for the operator. Problem is, the data does not fit in memory on one node, however, half of the data does.
So in five out of ten nodes, I stored one half of the data to be read in the open method, and the other half on the other five nodes.

Now my question: How can I distribute my DataSet, so that each element is sent once to a node with the first half of my data and once to a node with the other half?

I looked at implementing a custom partitioner, however my problems were:
(i) I have no mapping from the number I am supposed to return to the nodes to the data. How do I know, that index 5 contains one half of the data, and index 6 the other half?
(ii) I do not know the current index. Obviously, I want to send my DataSet element only once over the network. 

Best,
Stefan
Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Sachin Goel

Of course, someone else might have better ideas in re the partitioner. :)

On Sep 14, 2015 1:12 AM, "Sachin Goel" <[hidden email]> wrote:

Hi Stefan
Just a clarification : The output corresponding to an element based on the whole data will be a union of the outputs based on the two halves. Is this what you're trying to achieve? [It appears  like that since every flatMap task will independently produce outputs.]

In that case, one solution could be to simply have two flatMap operations based on parts of the *broadcast* data set, and take a union.

Cheers
Sachin

On Sep 13, 2015 7:04 PM, "Stefan Bunk" <[hidden email]> wrote:
Hi!

Following problem: I have 10 nodes on which I want to execute a flatMap operator on a DataSet. In the open method of the operator, some data is read from disk and preprocessed, which is necessary for the operator. Problem is, the data does not fit in memory on one node, however, half of the data does.
So in five out of ten nodes, I stored one half of the data to be read in the open method, and the other half on the other five nodes.

Now my question: How can I distribute my DataSet, so that each element is sent once to a node with the first half of my data and once to a node with the other half?

I looked at implementing a custom partitioner, however my problems were:
(i) I have no mapping from the number I am supposed to return to the nodes to the data. How do I know, that index 5 contains one half of the data, and index 6 the other half?
(ii) I do not know the current index. Obviously, I want to send my DataSet element only once over the network. 

Best,
Stefan
Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Fabian Hueske-2
Hi Stefan,

I agree with Sachin's approach. That should be the easiest solution and would look like:

env.setParallelism(10); // default is 10

DataSet data = env.read(...) // large data set
DataSet smallData1 = env.read(...) // read first part of small data
DataSet smallData2 = env.read(...) // read second part of small data

DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
DataSet result = mapped1.union(mapped2);

Unless you need to split your data into too many partitions this should work pretty well.
However, I agree that the custom partitioning function is a bit limited.

Best, Fabian


2015-09-13 21:51 GMT+02:00 Sachin Goel <[hidden email]>:

Of course, someone else might have better ideas in re the partitioner. :)

On Sep 14, 2015 1:12 AM, "Sachin Goel" <[hidden email]> wrote:

Hi Stefan
Just a clarification : The output corresponding to an element based on the whole data will be a union of the outputs based on the two halves. Is this what you're trying to achieve? [It appears  like that since every flatMap task will independently produce outputs.]

In that case, one solution could be to simply have two flatMap operations based on parts of the *broadcast* data set, and take a union.

Cheers
Sachin

On Sep 13, 2015 7:04 PM, "Stefan Bunk" <[hidden email]> wrote:
Hi!

Following problem: I have 10 nodes on which I want to execute a flatMap operator on a DataSet. In the open method of the operator, some data is read from disk and preprocessed, which is necessary for the operator. Problem is, the data does not fit in memory on one node, however, half of the data does.
So in five out of ten nodes, I stored one half of the data to be read in the open method, and the other half on the other five nodes.

Now my question: How can I distribute my DataSet, so that each element is sent once to a node with the first half of my data and once to a node with the other half?

I looked at implementing a custom partitioner, however my problems were:
(i) I have no mapping from the number I am supposed to return to the nodes to the data. How do I know, that index 5 contains one half of the data, and index 6 the other half?
(ii) I do not know the current index. Obviously, I want to send my DataSet element only once over the network. 

Best,
Stefan

Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Stefan Bunk
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan

Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Fabian Hueske-2
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan


Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Stefan Bunk
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan



Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Fabian Hueske-2
Hi Stefan,

the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster.
I assumed the data is small enough to broadcast because you want to keep it in memory.

Regarding your question. It is not guaranteed that two different tasks, each with parallelism 5, will be distributed to all 10 nodes (even if you have only 10 processing slots).
What would work is to have one map task with parallelism 10 and a Flink setup with 10 task managers on 10 machines with only one processing slot per TM. However, you won't be able to replicate the data to both sets of maps because you cannot know which task instance will be executed on which machine (you cannot distinguish the tasks of both task sets).

As I said, reading from local file system in a cluster and forcing task scheduling to specific nodes is quite tricky.
Cheers, Fabian

2015-09-15 23:15 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan




Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Stefan Bunk
Hi Fabian,

the local file problem would however not exist, if I just copy both halves to all nodes, right?

Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:

// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")

val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, "fileName").setParallelism(5)
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, "fileName").setParallelism(5)
DataSet result = mapped1.union(mapped2)

Then, in my custom operator implementation of flatMap I check the helper broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
}
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input
out.collect(this.data.process(input))
}

I think this should work, or do you see another problem here?

Which brings us to the other question:
The both halves are so large, that one half of the data fits in the user-remaining memory on a node, but not both halves. So my program would probably memory-crash, if the scheduling trusts one node so much, that it wants to execute two flatMaps there ;-).

You are saying, that it is not guaranteed, that all 10 nodes are used, but how likely is it, that one node is given two flatMaps and another one is basically idling? I have no idea of the internals, but I guess there is some heuristic inside which decides how to distribute.In the normal setup that all 10 nodes are up, connection is good, all nodes have the same resources available, input data is evenly distributed in HDFS, then the default case should be to distribute to all 10 nodes, right? 

I am not running in production, so for me it would be ok, if this works out usually.

Cheers
Stefan


On 15 September 2015 at 23:40, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster.
I assumed the data is small enough to broadcast because you want to keep it in memory.

Regarding your question. It is not guaranteed that two different tasks, each with parallelism 5, will be distributed to all 10 nodes (even if you have only 10 processing slots).
What would work is to have one map task with parallelism 10 and a Flink setup with 10 task managers on 10 machines with only one processing slot per TM. However, you won't be able to replicate the data to both sets of maps because you cannot know which task instance will be executed on which machine (you cannot distinguish the tasks of both task sets).

As I said, reading from local file system in a cluster and forcing task scheduling to specific nodes is quite tricky.
Cheers, Fabian

2015-09-15 23:15 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan





Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Fabian Hueske-2
Hi Stefan,

I think I have a solution for your problem :-)

1) Distribute both parts of the small data to each machine (you have done that)
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4 (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read the first half, tasks 5 to 9 read the second half.
3) Give the large input into a FlatMapper which sends out two records for each incoming record and assigns the first outgoing record a task ID in range 0 to 4 and the second outgoing record an ID in range 5 to 9.
4) Have a custom partitioner (DataSet.partitionCustom()) after the duplicating mapper, which partitions the records based on the assigned task Id before they go into the mapper with the other smaller data set. A record with assigned task ID 0 will be sent to the mapper task with subtask index 0.

This setup is not very nice, but should work.

Let me know, if you need more detail.

Cheers, Fabian

2015-09-16 21:44 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

the local file problem would however not exist, if I just copy both halves to all nodes, right?

Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:

// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")

val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, "fileName").setParallelism(5)
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, "fileName").setParallelism(5)
DataSet result = mapped1.union(mapped2)

Then, in my custom operator implementation of flatMap I check the helper broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
}
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input
out.collect(this.data.process(input))
}

I think this should work, or do you see another problem here?

Which brings us to the other question:
The both halves are so large, that one half of the data fits in the user-remaining memory on a node, but not both halves. So my program would probably memory-crash, if the scheduling trusts one node so much, that it wants to execute two flatMaps there ;-).

You are saying, that it is not guaranteed, that all 10 nodes are used, but how likely is it, that one node is given two flatMaps and another one is basically idling? I have no idea of the internals, but I guess there is some heuristic inside which decides how to distribute.In the normal setup that all 10 nodes are up, connection is good, all nodes have the same resources available, input data is evenly distributed in HDFS, then the default case should be to distribute to all 10 nodes, right? 

I am not running in production, so for me it would be ok, if this works out usually.

Cheers
Stefan


On 15 September 2015 at 23:40, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster.
I assumed the data is small enough to broadcast because you want to keep it in memory.

Regarding your question. It is not guaranteed that two different tasks, each with parallelism 5, will be distributed to all 10 nodes (even if you have only 10 processing slots).
What would work is to have one map task with parallelism 10 and a Flink setup with 10 task managers on 10 machines with only one processing slot per TM. However, you won't be able to replicate the data to both sets of maps because you cannot know which task instance will be executed on which machine (you cannot distinguish the tasks of both task sets).

As I said, reading from local file system in a cluster and forcing task scheduling to specific nodes is quite tricky.
Cheers, Fabian

2015-09-15 23:15 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan






Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Stefan Bunk
Hi Fabian,

that sounds good, thank you.

One final question: As I said earlier, this also distributes data in some unnecessary cases, say ID 4 sends data to ID 3.
Is there no way to find out the ID of the current node? I guess that number is already available on the node and just needs to be exposed somehow, right?

Cheers
Stefan



On 17 September 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

I think I have a solution for your problem :-)

1) Distribute both parts of the small data to each machine (you have done that)
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4 (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read the first half, tasks 5 to 9 read the second half.
3) Give the large input into a FlatMapper which sends out two records for each incoming record and assigns the first outgoing record a task ID in range 0 to 4 and the second outgoing record an ID in range 5 to 9.
4) Have a custom partitioner (DataSet.partitionCustom()) after the duplicating mapper, which partitions the records based on the assigned task Id before they go into the mapper with the other smaller data set. A record with assigned task ID 0 will be sent to the mapper task with subtask index 0.

This setup is not very nice, but should work.

Let me know, if you need more detail.

Cheers, Fabian

2015-09-16 21:44 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

the local file problem would however not exist, if I just copy both halves to all nodes, right?

Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:

// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")

val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, "fileName").setParallelism(5)
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, "fileName").setParallelism(5)
DataSet result = mapped1.union(mapped2)

Then, in my custom operator implementation of flatMap I check the helper broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
}
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input
out.collect(this.data.process(input))
}

I think this should work, or do you see another problem here?

Which brings us to the other question:
The both halves are so large, that one half of the data fits in the user-remaining memory on a node, but not both halves. So my program would probably memory-crash, if the scheduling trusts one node so much, that it wants to execute two flatMaps there ;-).

You are saying, that it is not guaranteed, that all 10 nodes are used, but how likely is it, that one node is given two flatMaps and another one is basically idling? I have no idea of the internals, but I guess there is some heuristic inside which decides how to distribute.In the normal setup that all 10 nodes are up, connection is good, all nodes have the same resources available, input data is evenly distributed in HDFS, then the default case should be to distribute to all 10 nodes, right? 

I am not running in production, so for me it would be ok, if this works out usually.

Cheers
Stefan


On 15 September 2015 at 23:40, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster.
I assumed the data is small enough to broadcast because you want to keep it in memory.

Regarding your question. It is not guaranteed that two different tasks, each with parallelism 5, will be distributed to all 10 nodes (even if you have only 10 processing slots).
What would work is to have one map task with parallelism 10 and a Flink setup with 10 task managers on 10 machines with only one processing slot per TM. However, you won't be able to replicate the data to both sets of maps because you cannot know which task instance will be executed on which machine (you cannot distinguish the tasks of both task sets).

As I said, reading from local file system in a cluster and forcing task scheduling to specific nodes is quite tricky.
Cheers, Fabian

2015-09-15 23:15 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan







Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Fabian Hueske-2

The custom partitioner does not know its task id but the mapper that assigns the partition ids knows its subtaskid.

So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7 will be send over the network.

On Sep 21, 2015 6:56 PM, "Stefan Bunk" <[hidden email]> wrote:
Hi Fabian,

that sounds good, thank you.

One final question: As I said earlier, this also distributes data in some unnecessary cases, say ID 4 sends data to ID 3.
Is there no way to find out the ID of the current node? I guess that number is already available on the node and just needs to be exposed somehow, right?

Cheers
Stefan



On 17 September 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

I think I have a solution for your problem :-)

1) Distribute both parts of the small data to each machine (you have done that)
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4 (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read the first half, tasks 5 to 9 read the second half.
3) Give the large input into a FlatMapper which sends out two records for each incoming record and assigns the first outgoing record a task ID in range 0 to 4 and the second outgoing record an ID in range 5 to 9.
4) Have a custom partitioner (DataSet.partitionCustom()) after the duplicating mapper, which partitions the records based on the assigned task Id before they go into the mapper with the other smaller data set. A record with assigned task ID 0 will be sent to the mapper task with subtask index 0.

This setup is not very nice, but should work.

Let me know, if you need more detail.

Cheers, Fabian

2015-09-16 21:44 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

the local file problem would however not exist, if I just copy both halves to all nodes, right?

Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:

// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")

val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, "fileName").setParallelism(5)
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, "fileName").setParallelism(5)
DataSet result = mapped1.union(mapped2)

Then, in my custom operator implementation of flatMap I check the helper broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
}
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input
out.collect(this.data.process(input))
}

I think this should work, or do you see another problem here?

Which brings us to the other question:
The both halves are so large, that one half of the data fits in the user-remaining memory on a node, but not both halves. So my program would probably memory-crash, if the scheduling trusts one node so much, that it wants to execute two flatMaps there ;-).

You are saying, that it is not guaranteed, that all 10 nodes are used, but how likely is it, that one node is given two flatMaps and another one is basically idling? I have no idea of the internals, but I guess there is some heuristic inside which decides how to distribute.In the normal setup that all 10 nodes are up, connection is good, all nodes have the same resources available, input data is evenly distributed in HDFS, then the default case should be to distribute to all 10 nodes, right? 

I am not running in production, so for me it would be ok, if this works out usually.

Cheers
Stefan


On 15 September 2015 at 23:40, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster.
I assumed the data is small enough to broadcast because you want to keep it in memory.

Regarding your question. It is not guaranteed that two different tasks, each with parallelism 5, will be distributed to all 10 nodes (even if you have only 10 processing slots).
What would work is to have one map task with parallelism 10 and a Flink setup with 10 task managers on 10 machines with only one processing slot per TM. However, you won't be able to replicate the data to both sets of maps because you cannot know which task instance will be executed on which machine (you cannot distinguish the tasks of both task sets).

As I said, reading from local file system in a cluster and forcing task scheduling to specific nodes is quite tricky.
Cheers, Fabian

2015-09-15 23:15 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan







Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Stefan Bunk
Of course!

On 21 September 2015 at 19:10, Fabian Hueske <[hidden email]> wrote:

The custom partitioner does not know its task id but the mapper that assigns the partition ids knows its subtaskid.

So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7 will be send over the network.

On Sep 21, 2015 6:56 PM, "Stefan Bunk" <[hidden email]> wrote:
Hi Fabian,

that sounds good, thank you.

One final question: As I said earlier, this also distributes data in some unnecessary cases, say ID 4 sends data to ID 3.
Is there no way to find out the ID of the current node? I guess that number is already available on the node and just needs to be exposed somehow, right?

Cheers
Stefan



On 17 September 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

I think I have a solution for your problem :-)

1) Distribute both parts of the small data to each machine (you have done that)
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4 (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read the first half, tasks 5 to 9 read the second half.
3) Give the large input into a FlatMapper which sends out two records for each incoming record and assigns the first outgoing record a task ID in range 0 to 4 and the second outgoing record an ID in range 5 to 9.
4) Have a custom partitioner (DataSet.partitionCustom()) after the duplicating mapper, which partitions the records based on the assigned task Id before they go into the mapper with the other smaller data set. A record with assigned task ID 0 will be sent to the mapper task with subtask index 0.

This setup is not very nice, but should work.

Let me know, if you need more detail.

Cheers, Fabian

2015-09-16 21:44 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

the local file problem would however not exist, if I just copy both halves to all nodes, right?

Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:

// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")

val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, "fileName").setParallelism(5)
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, "fileName").setParallelism(5)
DataSet result = mapped1.union(mapped2)

Then, in my custom operator implementation of flatMap I check the helper broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
}
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input
out.collect(this.data.process(input))
}

I think this should work, or do you see another problem here?

Which brings us to the other question:
The both halves are so large, that one half of the data fits in the user-remaining memory on a node, but not both halves. So my program would probably memory-crash, if the scheduling trusts one node so much, that it wants to execute two flatMaps there ;-).

You are saying, that it is not guaranteed, that all 10 nodes are used, but how likely is it, that one node is given two flatMaps and another one is basically idling? I have no idea of the internals, but I guess there is some heuristic inside which decides how to distribute.In the normal setup that all 10 nodes are up, connection is good, all nodes have the same resources available, input data is evenly distributed in HDFS, then the default case should be to distribute to all 10 nodes, right? 

I am not running in production, so for me it would be ok, if this works out usually.

Cheers
Stefan


On 15 September 2015 at 23:40, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster.
I assumed the data is small enough to broadcast because you want to keep it in memory.

Regarding your question. It is not guaranteed that two different tasks, each with parallelism 5, will be distributed to all 10 nodes (even if you have only 10 processing slots).
What would work is to have one map task with parallelism 10 and a Flink setup with 10 task managers on 10 machines with only one processing slot per TM. However, you won't be able to replicate the data to both sets of maps because you cannot know which task instance will be executed on which machine (you cannot distinguish the tasks of both task sets).

As I said, reading from local file system in a cluster and forcing task scheduling to specific nodes is quite tricky.
Cheers, Fabian

2015-09-15 23:15 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan








Reply | Threaded
Open this post in threaded view
|

Re: Distribute DataSet to subset of nodes

Stefan Bunk
Hi Fabian,

I implemented your approach from above. However, the runtime decides to run two subtasks on the same node, resulting in an out of memory error.
I thought partitioning really does partition the data to nodes, but now it seems like its partitioning to tasks, and tasks can be one the same machine, resulting in no partitioning at all.
Is that correct?

This happens, even if I run with parallelism 2. Is there some way to control this? Or to run node-local initialization?

Cheers
Stefan

On 21 September 2015 at 21:15, Stefan Bunk <[hidden email]> wrote:
Of course!

On 21 September 2015 at 19:10, Fabian Hueske <[hidden email]> wrote:

The custom partitioner does not know its task id but the mapper that assigns the partition ids knows its subtaskid.

So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7 will be send over the network.

On Sep 21, 2015 6:56 PM, "Stefan Bunk" <[hidden email]> wrote:
Hi Fabian,

that sounds good, thank you.

One final question: As I said earlier, this also distributes data in some unnecessary cases, say ID 4 sends data to ID 3.
Is there no way to find out the ID of the current node? I guess that number is already available on the node and just needs to be exposed somehow, right?

Cheers
Stefan



On 17 September 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

I think I have a solution for your problem :-)

1) Distribute both parts of the small data to each machine (you have done that)
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4 (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read the first half, tasks 5 to 9 read the second half.
3) Give the large input into a FlatMapper which sends out two records for each incoming record and assigns the first outgoing record a task ID in range 0 to 4 and the second outgoing record an ID in range 5 to 9.
4) Have a custom partitioner (DataSet.partitionCustom()) after the duplicating mapper, which partitions the records based on the assigned task Id before they go into the mapper with the other smaller data set. A record with assigned task ID 0 will be sent to the mapper task with subtask index 0.

This setup is not very nice, but should work.

Let me know, if you need more detail.

Cheers, Fabian

2015-09-16 21:44 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

the local file problem would however not exist, if I just copy both halves to all nodes, right?

Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:

// helper broadcast datasets to know on which half to operate
val data1stHalf = env.fromCollection("1st")
val data2ndHalf = env.fromCollection("2nd")

val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, "fileName").setParallelism(5)
val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, "fileName").setParallelism(5)
DataSet result = mapped1.union(mapped2)

Then, in my custom operator implementation of flatMap I check the helper broadcast data to know which file to load:
override def open(params: Configuration): Unit = {
val fileName = getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
// read the file from the local filesystem which I copied there earlier
this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
}
override def flatMap(document: Input, out: Collector[Output]): Unit = {
// do sth. with this.data and the input
out.collect(this.data.process(input))
}

I think this should work, or do you see another problem here?

Which brings us to the other question:
The both halves are so large, that one half of the data fits in the user-remaining memory on a node, but not both halves. So my program would probably memory-crash, if the scheduling trusts one node so much, that it wants to execute two flatMaps there ;-).

You are saying, that it is not guaranteed, that all 10 nodes are used, but how likely is it, that one node is given two flatMaps and another one is basically idling? I have no idea of the internals, but I guess there is some heuristic inside which decides how to distribute.In the normal setup that all 10 nodes are up, connection is good, all nodes have the same resources available, input data is evenly distributed in HDFS, then the default case should be to distribute to all 10 nodes, right? 

I am not running in production, so for me it would be ok, if this works out usually.

Cheers
Stefan


On 15 September 2015 at 23:40, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

the problem is that you cannot directly influence the scheduling of tasks to nodes to ensure that you can read the data that you put in the local filesystems of your nodes. HDFS gives a shared file system which means that each node can read data from anywhere in the cluster.
I assumed the data is small enough to broadcast because you want to keep it in memory.

Regarding your question. It is not guaranteed that two different tasks, each with parallelism 5, will be distributed to all 10 nodes (even if you have only 10 processing slots).
What would work is to have one map task with parallelism 10 and a Flink setup with 10 task managers on 10 machines with only one processing slot per TM. However, you won't be able to replicate the data to both sets of maps because you cannot know which task instance will be executed on which machine (you cannot distinguish the tasks of both task sets).

As I said, reading from local file system in a cluster and forcing task scheduling to specific nodes is quite tricky.
Cheers, Fabian

2015-09-15 23:15 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi Fabian,

I think we might have a misunderstanding here. I have already copied the first file to five nodes, and the second file to five other nodes, outside of Flink. In the open() method of the operator, I just read that file via normal Java means. I do not see, why this is tricky or how HDFS should help here.
Then, I have a normal Flink DataSet, which I want to run through the operator (using the previously read data in the flatMap implementation). As I run the program several times, I do not want to broadcast the data every time, but rather just copy it on the nodes, and be fine with it.

Can you answer my question from above? If the setParallelism-method works and selects five nodes for the first flatMap and five _other_ nodes for the second flatMap, then that would be fine for me if there is no other easy solution.

Thanks for your help!
Best
Stefan


On 14 September 2015 at 22:28, Fabian Hueske <[hidden email]> wrote:
Hi Stefan,

forcing the scheduling of tasks to certain nodes and reading files from the local file system in a multi-node setup is actually quite tricky and requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a shared filesystem such as HDFS if that is possible.

Best, Fabian

2015-09-14 19:16 GMT+02:00 Stefan Bunk <[hidden email]>:
Hi,

actually, I am distributing my data before the program starts, without using broadcast sets.

However, the approach should still work, under one condition:
DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
Is it guaranteed, that this selects a disjoint set of nodes, i.e. five nodes for mapped1 and five other nodes for mapped2?

Is there any way of selecting the five nodes concretely? Currently, I have stored the first half of the data on nodes 1-5 and the second half on nodes 6-10. With this approach, I guess, nodes are selected randomly so I would have to copy both halves to all of the nodes.

Best,
Stefan