Load balancing

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Load balancing

Kruse, Sebastian

Hi folks,

 

I would like to do some load balancing within one of my Flink jobs to achieve good scalability. The rebalance() method is not applicable in my case, as the runtime is dominated by the processing of very few larger elements in my dataset. Hence, I need to distribute the processing work for these elements among the nodes in the cluster. To do so, I subdivide those elements into partial tasks and want to distribute these partial tasks to other nodes by employing a custom partitioner.

 

Now, my question is the following: Actually, I do not need to shuffle the complete dataset but only a few elements. So is there a way of telling within the partitioner, that data should reside on the same task manager? Thanks!

 

Cheers,

Sebastian

Reply | Threaded
Open this post in threaded view
|

Re: Load balancing

Fabian Hueske-2
Hi Sebastian,

I agree, shuffling only specific elements would be a very useful feature, but unfortunately it's not supported (yet).
Would you like to open a JIRA for that?

Cheers, Fabian

2015-06-09 17:22 GMT+02:00 Kruse, Sebastian <[hidden email]>:

Hi folks,

 

I would like to do some load balancing within one of my Flink jobs to achieve good scalability. The rebalance() method is not applicable in my case, as the runtime is dominated by the processing of very few larger elements in my dataset. Hence, I need to distribute the processing work for these elements among the nodes in the cluster. To do so, I subdivide those elements into partial tasks and want to distribute these partial tasks to other nodes by employing a custom partitioner.

 

Now, my question is the following: Actually, I do not need to shuffle the complete dataset but only a few elements. So is there a way of telling within the partitioner, that data should reside on the same task manager? Thanks!

 

Cheers,

Sebastian


Reply | Threaded
Open this post in threaded view
|

Re: Load balancing

Gianmarco De Francisci Morales
We have been working on an adaptive load balancing strategy that would address exactly the issue you point out.
FLINK-1725 is the starting point for the integration.

Cheers,

--
Gianmarco

On 9 June 2015 at 20:31, Fabian Hueske <[hidden email]> wrote:
Hi Sebastian,

I agree, shuffling only specific elements would be a very useful feature, but unfortunately it's not supported (yet).
Would you like to open a JIRA for that?

Cheers, Fabian

2015-06-09 17:22 GMT+02:00 Kruse, Sebastian <[hidden email]>:

Hi folks,

 

I would like to do some load balancing within one of my Flink jobs to achieve good scalability. The rebalance() method is not applicable in my case, as the runtime is dominated by the processing of very few larger elements in my dataset. Hence, I need to distribute the processing work for these elements among the nodes in the cluster. To do so, I subdivide those elements into partial tasks and want to distribute these partial tasks to other nodes by employing a custom partitioner.

 

Now, my question is the following: Actually, I do not need to shuffle the complete dataset but only a few elements. So is there a way of telling within the partitioner, that data should reside on the same task manager? Thanks!

 

Cheers,

Sebastian



Reply | Threaded
Open this post in threaded view
|

RE: Load balancing

Kruse, Sebastian

Hi Gianmarco,

 

Thanks for the pointer!

 

I had a quick look at the paper, but unfortunately I don’t see a connection to my problem. I have a batch job and elements in my dataset, that need quadratic much processing time depending on their size. The largest ones, that cause higher-than-average load, shall be split up and the splits shall be distributed among the workers. Your paper says “In  principle,  depending  on  the  application,  two  different messages might impose a different load on workers. However, in  most  cases  these  differences  even  out  and  modeling  such application-specific differences is not necessary.” Maybe, I am missing something, but doesn’t this assumption render PKG inapplicable to my case? Objections to that are of course welcome :)

 

Cheers,

Sebastian

 

From: Gianmarco De Francisci Morales [mailto:[hidden email]]
Sent: Mittwoch, 10. Juni 2015 15:40
To: [hidden email]
Subject: Re: Load balancing

 

We have been working on an adaptive load balancing strategy that would address exactly the issue you point out.

FLINK-1725 is the starting point for the integration.

 

Cheers,


--

Gianmarco

 

On 9 June 2015 at 20:31, Fabian Hueske <[hidden email]> wrote:

Hi Sebastian,

I agree, shuffling only specific elements would be a very useful feature, but unfortunately it's not supported (yet).

Would you like to open a JIRA for that?

Cheers, Fabian

 

2015-06-09 17:22 GMT+02:00 Kruse, Sebastian <[hidden email]>:

Hi folks,

 

I would like to do some load balancing within one of my Flink jobs to achieve good scalability. The rebalance() method is not applicable in my case, as the runtime is dominated by the processing of very few larger elements in my dataset. Hence, I need to distribute the processing work for these elements among the nodes in the cluster. To do so, I subdivide those elements into partial tasks and want to distribute these partial tasks to other nodes by employing a custom partitioner.

 

Now, my question is the following: Actually, I do not need to shuffle the complete dataset but only a few elements. So is there a way of telling within the partitioner, that data should reside on the same task manager? Thanks!

 

Cheers,

Sebastian

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Load balancing

Gianmarco De Francisci Morales
Hi Sebastian,

Maybe I misunderstood your problem.
Is the processing time quadratic in the size of the single element of the dataset?
Or is it quadratic in the number of elements of the dataset with a single key?
That is, is the element heavy or is it the key heavy?

In the second case you can use PKG.
In the first case, I don't think you really need any system level help.
Given that you can split up the work for each element, you can just transform the dataset so that a single heavy element is replaced by a set of generated sub-elements, with the ID of the original element as the key.
Then you can process the subelements in parallel, and finally group by key to aggregate the result.

Cheers,

--
Gianmarco

On 11 June 2015 at 19:16, Kruse, Sebastian <[hidden email]> wrote:

Hi Gianmarco,

 

Thanks for the pointer!

 

I had a quick look at the paper, but unfortunately I don’t see a connection to my problem. I have a batch job and elements in my dataset, that need quadratic much processing time depending on their size. The largest ones, that cause higher-than-average load, shall be split up and the splits shall be distributed among the workers. Your paper says “In  principle,  depending  on  the  application,  two  different messages might impose a different load on workers. However, in  most  cases  these  differences  even  out  and  modeling  such application-specific differences is not necessary.” Maybe, I am missing something, but doesn’t this assumption render PKG inapplicable to my case? Objections to that are of course welcome :)

 

Cheers,

Sebastian

 

From: Gianmarco De Francisci Morales [mailto:[hidden email]]
Sent: Mittwoch, 10. Juni 2015 15:40
To: [hidden email]
Subject: Re: Load balancing

 

We have been working on an adaptive load balancing strategy that would address exactly the issue you point out.

FLINK-1725 is the starting point for the integration.

 

Cheers,


--

Gianmarco

 

On 9 June 2015 at 20:31, Fabian Hueske <[hidden email]> wrote:

Hi Sebastian,

I agree, shuffling only specific elements would be a very useful feature, but unfortunately it's not supported (yet).

Would you like to open a JIRA for that?

Cheers, Fabian

 

2015-06-09 17:22 GMT+02:00 Kruse, Sebastian <[hidden email]>:

Hi folks,

 

I would like to do some load balancing within one of my Flink jobs to achieve good scalability. The rebalance() method is not applicable in my case, as the runtime is dominated by the processing of very few larger elements in my dataset. Hence, I need to distribute the processing work for these elements among the nodes in the cluster. To do so, I subdivide those elements into partial tasks and want to distribute these partial tasks to other nodes by employing a custom partitioner.

 

Now, my question is the following: Actually, I do not need to shuffle the complete dataset but only a few elements. So is there a way of telling within the partitioner, that data should reside on the same task manager? Thanks!

 

Cheers,

Sebastian

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Load balancing

Kruse, Sebastian

Hi Gianmarco,

 

The processing time is quadratic in the size of the single elements. I was already applying that strategy that you also proposed, but tried to find out if there is a way of balancing the subitems of these large items over the workers without shuffling the whole dataset. However, I noticed the PKG strategy and maybe it will come in handy in some other place :)

So, thanks again for the pointers!

 

Cheers,

Sebastian

 

From: Gianmarco De Francisci Morales [mailto:[hidden email]]
Sent: Freitag, 12. Juni 2015 19:02
To: [hidden email]
Subject: Re: Load balancing

 

Hi Sebastian,

 

Maybe I misunderstood your problem.

Is the processing time quadratic in the size of the single element of the dataset?

Or is it quadratic in the number of elements of the dataset with a single key?

That is, is the element heavy or is it the key heavy?

 

In the second case you can use PKG.

In the first case, I don't think you really need any system level help.

Given that you can split up the work for each element, you can just transform the dataset so that a single heavy element is replaced by a set of generated sub-elements, with the ID of the original element as the key.

Then you can process the subelements in parallel, and finally group by key to aggregate the result.

 

Cheers,


--

Gianmarco

 

On 11 June 2015 at 19:16, Kruse, Sebastian <[hidden email]> wrote:

Hi Gianmarco,

 

Thanks for the pointer!

 

I had a quick look at the paper, but unfortunately I don’t see a connection to my problem. I have a batch job and elements in my dataset, that need quadratic much processing time depending on their size. The largest ones, that cause higher-than-average load, shall be split up and the splits shall be distributed among the workers. Your paper says “In  principle,  depending  on  the  application,  two  different messages might impose a different load on workers. However, in  most  cases  these  differences  even  out  and  modeling  such application-specific differences is not necessary.” Maybe, I am missing something, but doesn’t this assumption render PKG inapplicable to my case? Objections to that are of course welcome :)

 

Cheers,

Sebastian

 

From: Gianmarco De Francisci Morales [mailto:[hidden email]]
Sent: Mittwoch, 10. Juni 2015 15:40
To: [hidden email]
Subject: Re: Load balancing

 

We have been working on an adaptive load balancing strategy that would address exactly the issue you point out.

FLINK-1725 is the starting point for the integration.

 

Cheers,


--

Gianmarco

 

On 9 June 2015 at 20:31, Fabian Hueske <[hidden email]> wrote:

Hi Sebastian,

I agree, shuffling only specific elements would be a very useful feature, but unfortunately it's not supported (yet).

Would you like to open a JIRA for that?

Cheers, Fabian

 

2015-06-09 17:22 GMT+02:00 Kruse, Sebastian <[hidden email]>:

Hi folks,

 

I would like to do some load balancing within one of my Flink jobs to achieve good scalability. The rebalance() method is not applicable in my case, as the runtime is dominated by the processing of very few larger elements in my dataset. Hence, I need to distribute the processing work for these elements among the nodes in the cluster. To do so, I subdivide those elements into partial tasks and want to distribute these partial tasks to other nodes by employing a custom partitioner.

 

Now, my question is the following: Actually, I do not need to shuffle the complete dataset but only a few elements. So is there a way of telling within the partitioner, that data should reside on the same task manager? Thanks!

 

Cheers,

Sebastian