repartion locally to task manager

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

repartion locally to task manager

Ventura Del Monte
Hello,

I am trying to introduce a new feature in my flink project, I would like to shuffle (random repartition) my dataset only locally to a task manager, so that each internal worker will have a different set of objects to work on. I have looked to internal flink mechanism, and I know (i hope) how it handles partitions. I think there are two ways to do it:

a) using a mapPartiton, which for each input object X should output a tuple (X, destinationChannel), where the destinationChannel is the id of the new worker that will receive X. The main problem of this solution is to determine the correct destinationChannel in the mapPartition task. I think every operation in flink is unaware of the task manager on which it is executed, so I will need to read taskmanager config in order to get the number of slots available on the current TM, but then how should I relate this number to the total channels count, since I could have a situation like this:

+----+----+----+----+----+----+----+----+----+---+---+---+---+----+
|    |    |    |    |    |    |    |    |    |   |   |   |   |    |
| 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
+----+----+----+---------+----+----+----+----+--------------------+
|                   |                            |                |
|      TM1          |            TM2             |       TM3      |
+-------------------+----------------------------+----------------+

So even if I knew TM2 had 6 slots, i would not be able to know their id range -> [4,9]

b) Destination channels are choosen in RegularPactTask.getOutputCollector, so some modifications of this method would make the local repartition possible using either a range or a custom partition, in order to make them taskmanager-aware. Yet this will involve some edits to flink runtime. 

Tbh, I would like to avoid the b. but I think I am at a dead end, and I will have to edit it. 

Do you have better suggestions? Thank you in advance.
Reply | Threaded
Open this post in threaded view
|

Re: repartion locally to task manager

Stephan Ewen
Hi Ventura!

Sorry for the late response. Here are a few ideas or comments that may help you:

1) We want to make it possible for a function (such as MapFunction) to figure out on which TaskManager it is running. The mechanism would be something like "getRuntimeContext().getTaskManagerInformation()". That should help you determine which TaskManager you are.

2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2, ... are on the same TaskManager. The assignment is a based on locality of the input data stream and the availability of slots.


Can you explain a bit more what the feature you want to add actually tries to achieve? Then I may be able to give you more pointers.

When you say that you need local re-distribution, does it imply something like below, where a change of parallelism between operators implies that the only locally repartition (not across the boundaries of TaskManagers)?


 (map) (map)  (map) (map)
   \     /      \    /
    \   /        \  /
   (reduce)    (reduce)
      ^ ^        ^ ^
      | \        / |
      |  +------+  |
      | /        \ |
   (source)     (source) 



Greetings,
Stephan



On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <[hidden email]> wrote:
Hello,

I am trying to introduce a new feature in my flink project, I would like to shuffle (random repartition) my dataset only locally to a task manager, so that each internal worker will have a different set of objects to work on. I have looked to internal flink mechanism, and I know (i hope) how it handles partitions. I think there are two ways to do it:

a) using a mapPartiton, which for each input object X should output a tuple (X, destinationChannel), where the destinationChannel is the id of the new worker that will receive X. The main problem of this solution is to determine the correct destinationChannel in the mapPartition task. I think every operation in flink is unaware of the task manager on which it is executed, so I will need to read taskmanager config in order to get the number of slots available on the current TM, but then how should I relate this number to the total channels count, since I could have a situation like this:

+----+----+----+----+----+----+----+----+----+---+---+---+---+----+
|    |    |    |    |    |    |    |    |    |   |   |   |   |    |
| 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
+----+----+----+---------+----+----+----+----+--------------------+
|                   |                            |                |
|      TM1          |            TM2             |       TM3      |
+-------------------+----------------------------+----------------+

So even if I knew TM2 had 6 slots, i would not be able to know their id range -> [4,9]

b) Destination channels are choosen in RegularPactTask.getOutputCollector, so some modifications of this method would make the local repartition possible using either a range or a custom partition, in order to make them taskmanager-aware. Yet this will involve some edits to flink runtime. 

Tbh, I would like to avoid the b. but I think I am at a dead end, and I will have to edit it. 

Do you have better suggestions? Thank you in advance.

Reply | Threaded
Open this post in threaded view
|

Re: repartion locally to task manager

Ventura Del Monte
Hi Stephan,

Many thank for your reply! 

1) This would be a nice feature. I have already done something similar, if you told me which informations you would like to export in the runtime context, I could add them to my code, update unit tests and share them.

2) Yes, I have figured that out. However, I needed this kind of local repartition since I was working on a dataset sampler based on the filter operator (this is the first step of the iterative pipeline I am developing). To be honest, this repartition is just a plus because I have already achieved good results (even if a sampler like the one offered by spark when the ratio is low would be a good feature). The main drawback of this filter operation is that it takes in input always the same partition, so, if the partition is enough big, then the probability of sampling different items in consecutive filtering operations should be high (of course, using a good sampling factor and a correctly seeded rng). Yet if it was possible to shuffle the partitions on the same task manager, the following sampling operation would benefit, in my opinion, as the produced partition would contain different items with an even higher probability. Of course, I think this shuffle operation (being local to each tm) should not involve neither a network nor a disk transfer, otherwise, the game is not worth the candle. 
About the change of parallelism, I read that it triggers a sort of local re-distribution, but I do no think it is my case. Anyway, do you think this kind of shuffling/sampling can be achieved in flink? Does it make sense in your opinion?


Best Regards,
Ventura

2015-06-03 14:57 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi Ventura!

Sorry for the late response. Here are a few ideas or comments that may help you:

1) We want to make it possible for a function (such as MapFunction) to figure out on which TaskManager it is running. The mechanism would be something like "getRuntimeContext().getTaskManagerInformation()". That should help you determine which TaskManager you are.

2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2, ... are on the same TaskManager. The assignment is a based on locality of the input data stream and the availability of slots.


Can you explain a bit more what the feature you want to add actually tries to achieve? Then I may be able to give you more pointers.

When you say that you need local re-distribution, does it imply something like below, where a change of parallelism between operators implies that the only locally repartition (not across the boundaries of TaskManagers)?


 (map) (map)  (map) (map)
   \     /      \    /
    \   /        \  /
   (reduce)    (reduce)
      ^ ^        ^ ^
      | \        / |
      |  +------+  |
      | /        \ |
   (source)     (source) 



Greetings,
Stephan



On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <[hidden email]> wrote:
Hello,

I am trying to introduce a new feature in my flink project, I would like to shuffle (random repartition) my dataset only locally to a task manager, so that each internal worker will have a different set of objects to work on. I have looked to internal flink mechanism, and I know (i hope) how it handles partitions. I think there are two ways to do it:

a) using a mapPartiton, which for each input object X should output a tuple (X, destinationChannel), where the destinationChannel is the id of the new worker that will receive X. The main problem of this solution is to determine the correct destinationChannel in the mapPartition task. I think every operation in flink is unaware of the task manager on which it is executed, so I will need to read taskmanager config in order to get the number of slots available on the current TM, but then how should I relate this number to the total channels count, since I could have a situation like this:

+----+----+----+----+----+----+----+----+----+---+---+---+---+----+
|    |    |    |    |    |    |    |    |    |   |   |   |   |    |
| 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
+----+----+----+---------+----+----+----+----+--------------------+
|                   |                            |                |
|      TM1          |            TM2             |       TM3      |
+-------------------+----------------------------+----------------+

So even if I knew TM2 had 6 slots, i would not be able to know their id range -> [4,9]

b) Destination channels are choosen in RegularPactTask.getOutputCollector, so some modifications of this method would make the local repartition possible using either a range or a custom partition, in order to make them taskmanager-aware. Yet this will involve some edits to flink runtime. 

Tbh, I would like to avoid the b. but I think I am at a dead end, and I will have to edit it. 

Do you have better suggestions? Thank you in advance.


Reply | Threaded
Open this post in threaded view
|

Re: repartion locally to task manager

Stephan Ewen
Hi Ventura!

Concerning (1) :

What would be good is to make the "org.apache.flink.runtime.instance.InstanceConnectionInfo" in the getruntimeContext()'s RuntimeContext object. In order to do that, we could need to move that into the flink-core package. We could also rename it simply to "ConnectionInfo"


Concerning (2) :

I think this may be a bit harder to add. I am curious what your results are without this optimization.


Stephan


On Mon, Jun 8, 2015 at 4:49 PM, Ventura Del Monte <[hidden email]> wrote:
Hi Stephan,

Many thank for your reply! 

1) This would be a nice feature. I have already done something similar, if you told me which informations you would like to export in the runtime context, I could add them to my code, update unit tests and share them.

2) Yes, I have figured that out. However, I needed this kind of local repartition since I was working on a dataset sampler based on the filter operator (this is the first step of the iterative pipeline I am developing). To be honest, this repartition is just a plus because I have already achieved good results (even if a sampler like the one offered by spark when the ratio is low would be a good feature). The main drawback of this filter operation is that it takes in input always the same partition, so, if the partition is enough big, then the probability of sampling different items in consecutive filtering operations should be high (of course, using a good sampling factor and a correctly seeded rng). Yet if it was possible to shuffle the partitions on the same task manager, the following sampling operation would benefit, in my opinion, as the produced partition would contain different items with an even higher probability. Of course, I think this shuffle operation (being local to each tm) should not involve neither a network nor a disk transfer, otherwise, the game is not worth the candle. 
About the change of parallelism, I read that it triggers a sort of local re-distribution, but I do no think it is my case. Anyway, do you think this kind of shuffling/sampling can be achieved in flink? Does it make sense in your opinion?


Best Regards,
Ventura

2015-06-03 14:57 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi Ventura!

Sorry for the late response. Here are a few ideas or comments that may help you:

1) We want to make it possible for a function (such as MapFunction) to figure out on which TaskManager it is running. The mechanism would be something like "getRuntimeContext().getTaskManagerInformation()". That should help you determine which TaskManager you are.

2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2, ... are on the same TaskManager. The assignment is a based on locality of the input data stream and the availability of slots.


Can you explain a bit more what the feature you want to add actually tries to achieve? Then I may be able to give you more pointers.

When you say that you need local re-distribution, does it imply something like below, where a change of parallelism between operators implies that the only locally repartition (not across the boundaries of TaskManagers)?


 (map) (map)  (map) (map)
   \     /      \    /
    \   /        \  /
   (reduce)    (reduce)
      ^ ^        ^ ^
      | \        / |
      |  +------+  |
      | /        \ |
   (source)     (source) 



Greetings,
Stephan



On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <[hidden email]> wrote:
Hello,

I am trying to introduce a new feature in my flink project, I would like to shuffle (random repartition) my dataset only locally to a task manager, so that each internal worker will have a different set of objects to work on. I have looked to internal flink mechanism, and I know (i hope) how it handles partitions. I think there are two ways to do it:

a) using a mapPartiton, which for each input object X should output a tuple (X, destinationChannel), where the destinationChannel is the id of the new worker that will receive X. The main problem of this solution is to determine the correct destinationChannel in the mapPartition task. I think every operation in flink is unaware of the task manager on which it is executed, so I will need to read taskmanager config in order to get the number of slots available on the current TM, but then how should I relate this number to the total channels count, since I could have a situation like this:

+----+----+----+----+----+----+----+----+----+---+---+---+---+----+
|    |    |    |    |    |    |    |    |    |   |   |   |   |    |
| 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
+----+----+----+---------+----+----+----+----+--------------------+
|                   |                            |                |
|      TM1          |            TM2             |       TM3      |
+-------------------+----------------------------+----------------+

So even if I knew TM2 had 6 slots, i would not be able to know their id range -> [4,9]

b) Destination channels are choosen in RegularPactTask.getOutputCollector, so some modifications of this method would make the local repartition possible using either a range or a custom partition, in order to make them taskmanager-aware. Yet this will involve some edits to flink runtime. 

Tbh, I would like to avoid the b. but I think I am at a dead end, and I will have to edit it. 

Do you have better suggestions? Thank you in advance.