scaling question

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

scaling question

Bill Sparks

Sorry for the post again. I guess I'm not understanding this… 

The question is how to scale up/increase the execution of a problem. What  I'm trying to do, is get the best out of the available processors for a given node count and compare this against spark, using KMeans.

For spark,  one method is to increase the executors and RDD partitions  - for Flink I can increase the number of task slots (taskmanager.numberOfTaskSlots). My empirical evidence suggests that just increasing the slots does not increase processing of the data. Is there something I'm missing? Much like spark with re-partitioning your datasets, is there an equivalent option for flink? What about the parallelism argument The referring document seems to be broken…


If I do increase the parallelism to be (taskManagers*slots) I hit the "Insufficient number of network buffers…" 

I have 16 nodes (64 HT cores), and have run TaskSlots from 1, 4, 8, 16  and still the execution time is always around 5-6 minutes, using the default parallelism.

Regards,
    Bill
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.
Reply | Threaded
Open this post in threaded view
|

Re: scaling question

Maximilian Michels
Hi Bill,

You're right. Simply increasing the task manager slots doesn't do anything. It is correct to set the parallelism to taskManagers*slots. Simply increase the number of network buffers in the flink-conf.yaml, e.g. to 4096. In the future, we will configure this setting dynamically.

Let us know if your runtime decreases :)

Cheers,
Max

On Fri, Jun 19, 2015 at 4:24 PM, Bill Sparks <[hidden email]> wrote:

Sorry for the post again. I guess I'm not understanding this… 

The question is how to scale up/increase the execution of a problem. What  I'm trying to do, is get the best out of the available processors for a given node count and compare this against spark, using KMeans.

For spark,  one method is to increase the executors and RDD partitions  - for Flink I can increase the number of task slots (taskmanager.numberOfTaskSlots). My empirical evidence suggests that just increasing the slots does not increase processing of the data. Is there something I'm missing? Much like spark with re-partitioning your datasets, is there an equivalent option for flink? What about the parallelism argument The referring document seems to be broken…


If I do increase the parallelism to be (taskManagers*slots) I hit the "Insufficient number of network buffers…" 

I have 16 nodes (64 HT cores), and have run TaskSlots from 1, 4, 8, 16  and still the execution time is always around 5-6 minutes, using the default parallelism.

Regards,
    Bill
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.

Reply | Threaded
Open this post in threaded view
|

Re: scaling question

Ufuk Celebi
In reply to this post by Bill Sparks
Hey Bill!

On 19 Jun 2015, at 16:24, Bill Sparks <[hidden email]> wrote:

> Sorry for the post again. I guess I'm not understanding this…

Thanks for posting again, not sorry! ;-)

Regarding the broken link: where did you get this link? I think it should be referring here: http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#parallel-execution

For smaller machines, we usually suggest to set the number of slots per task manager to the number of cores, in your case 64. You will then have #taskmanager * #slotsPerTaskManager slots in your cluster, e.g. 1024. This should also be the parallelism you set for your program.

Regarding configuration of network buffers: http://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers

With this high core count per machine, it is suggested to give the network layer quite some memory for concurrent shuffles. If you can afford it, go as high as a couple of GBs. The formula in the docs would suggest 262144 buffers, e.g. 8 GB.

If this is not an option, I would decrease the number of slots per task manager (e.g. to 32) and reserve less buffers for the network stack.

Feel free to post any time when something comes up.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: scaling question

Fabian Hueske-2
In reply to this post by Maximilian Michels
Hi Bill,

no worry, questions are the purpose of this mailing list.

The number network buffers is a parameter that needs to be scaled with your setup. The reason for that is Flink's pipelined data transfer, which requires a certain number of network buffers to be available at the same time during processing.

There is an FAQ entry that explains how to set this parameter according to your setup:
--> http://flink.apache.org/faq.html#i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this


If you are working on the latest snapshot you can also configure Flink to use batched data transfer instead of pipelined transfer. This is done via the ExecutionConfig.setExecutionMode(), which you obtain by calling getConfig() on your ExecutionEnvironment.

Best, Fabian


2015-06-19 16:31 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Bill,

You're right. Simply increasing the task manager slots doesn't do anything. It is correct to set the parallelism to taskManagers*slots. Simply increase the number of network buffers in the flink-conf.yaml, e.g. to 4096. In the future, we will configure this setting dynamically.

Let us know if your runtime decreases :)

Cheers,
Max

On Fri, Jun 19, 2015 at 4:24 PM, Bill Sparks <[hidden email]> wrote:

Sorry for the post again. I guess I'm not understanding this… 

The question is how to scale up/increase the execution of a problem. What  I'm trying to do, is get the best out of the available processors for a given node count and compare this against spark, using KMeans.

For spark,  one method is to increase the executors and RDD partitions  - for Flink I can increase the number of task slots (taskmanager.numberOfTaskSlots). My empirical evidence suggests that just increasing the slots does not increase processing of the data. Is there something I'm missing? Much like spark with re-partitioning your datasets, is there an equivalent option for flink? What about the parallelism argument The referring document seems to be broken…


If I do increase the parallelism to be (taskManagers*slots) I hit the "Insufficient number of network buffers…" 

I have 16 nodes (64 HT cores), and have run TaskSlots from 1, 4, 8, 16  and still the execution time is always around 5-6 minutes, using the default parallelism.

Regards,
    Bill
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.


Reply | Threaded
Open this post in threaded view
|

Re: scaling question

Ufuk Celebi
PS: I've read your last email as 64 HT cores per machine. If it was in total over the 16 nodes, you have to adjust my response accordingly. ;)

On 19 Jun 2015, at 16:42, Fabian Hueske <[hidden email]> wrote:

> Hi Bill,
>
> no worry, questions are the purpose of this mailing list.
>
> The number network buffers is a parameter that needs to be scaled with your setup. The reason for that is Flink's pipelined data transfer, which requires a certain number of network buffers to be available at the same time during processing.
>
> There is an FAQ entry that explains how to set this parameter according to your setup:
> --> http://flink.apache.org/faq.html#i-get-an-error-message-saying-that-not-enough-buffers-are-available-how-do-i-fix-this
>
> The documentation for parallel execution can be found here:
> http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#parallel-execution
>
> If you are working on the latest snapshot you can also configure Flink to use batched data transfer instead of pipelined transfer. This is done via the ExecutionConfig.setExecutionMode(), which you obtain by calling getConfig() on your ExecutionEnvironment.
>
> Best, Fabian
>
>
> 2015-06-19 16:31 GMT+02:00 Maximilian Michels <[hidden email]>:
> Hi Bill,
>
> You're right. Simply increasing the task manager slots doesn't do anything. It is correct to set the parallelism to taskManagers*slots. Simply increase the number of network buffers in the flink-conf.yaml, e.g. to 4096. In the future, we will configure this setting dynamically.
>
> Let us know if your runtime decreases :)
>
> Cheers,
> Max
>
> On Fri, Jun 19, 2015 at 4:24 PM, Bill Sparks <[hidden email]> wrote:
>
> Sorry for the post again. I guess I'm not understanding this…
>
> The question is how to scale up/increase the execution of a problem. What  I'm trying to do, is get the best out of the available processors for a given node count and compare this against spark, using KMeans.
>
> For spark,  one method is to increase the executors and RDD partitions  - for Flink I can increase the number of task slots (taskmanager.numberOfTaskSlots). My empirical evidence suggests that just increasing the slots does not increase processing of the data. Is there something I'm missing? Much like spark with re-partitioning your datasets, is there an equivalent option for flink? What about the parallelism argument The referring document seems to be broken…
>
> This seems to be a dead link: https://github.com/apache/flink/blob/master/docs/setup/%7B%7Bsite.baseurl%7D%7D/apis/programming_guide.html#parallel-execution
>
> If I do increase the parallelism to be (taskManagers*slots) I hit the "Insufficient number of network buffers…"
>
> I have 16 nodes (64 HT cores), and have run TaskSlots from 1, 4, 8, 16  and still the execution time is always around 5-6 minutes, using the default parallelism.
>
> Regards,
>     Bill
> --
> Jonathan (Bill) Sparks
> Software Architecture
> Cray Inc.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: scaling question

Bill Sparks
To clarify Š it's 64HT cores per node, 16 nodes each with 128GB. Well
actually I have 48 nodes Š but trying to limit it so we have a comparison
with Spark/MPI/MapReduce all at the same node count.

Thanks for the information.

--
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.





On 6/19/15 9:44 AM, "Ufuk Celebi" <[hidden email]> wrote:

>PS: I've read your last email as 64 HT cores per machine. If it was in
>total over the 16 nodes, you have to adjust my response accordingly. ;)
>
>On 19 Jun 2015, at 16:42, Fabian Hueske <[hidden email]> wrote:
>
>> Hi Bill,
>>
>> no worry, questions are the purpose of this mailing list.
>>
>> The number network buffers is a parameter that needs to be scaled with
>>your setup. The reason for that is Flink's pipelined data transfer,
>>which requires a certain number of network buffers to be available at
>>the same time during processing.
>>
>> There is an FAQ entry that explains how to set this parameter according
>>to your setup:
>> -->
>>http://flink.apache.org/faq.html#i-get-an-error-message-saying-that-not-e
>>nough-buffers-are-available-how-do-i-fix-this
>>
>> The documentation for parallel execution can be found here:
>>
>>http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_gu
>>ide.html#parallel-execution
>>
>> If you are working on the latest snapshot you can also configure Flink
>>to use batched data transfer instead of pipelined transfer. This is done
>>via the ExecutionConfig.setExecutionMode(), which you obtain by calling
>>getConfig() on your ExecutionEnvironment.
>>
>> Best, Fabian
>>
>>
>> 2015-06-19 16:31 GMT+02:00 Maximilian Michels <[hidden email]>:
>> Hi Bill,
>>
>> You're right. Simply increasing the task manager slots doesn't do
>>anything. It is correct to set the parallelism to taskManagers*slots.
>>Simply increase the number of network buffers in the flink-conf.yaml,
>>e.g. to 4096. In the future, we will configure this setting dynamically.
>>
>> Let us know if your runtime decreases :)
>>
>> Cheers,
>> Max
>>
>> On Fri, Jun 19, 2015 at 4:24 PM, Bill Sparks <[hidden email]> wrote:
>>
>> Sorry for the post again. I guess I'm not understanding thisŠ
>>
>> The question is how to scale up/increase the execution of a problem.
>>What  I'm trying to do, is get the best out of the available processors
>>for a given node count and compare this against spark, using KMeans.
>>
>> For spark,  one method is to increase the executors and RDD partitions
>>- for Flink I can increase the number of task slots
>>(taskmanager.numberOfTaskSlots). My empirical evidence suggests that
>>just increasing the slots does not increase processing of the data. Is
>>there something I'm missing? Much like spark with re-partitioning your
>>datasets, is there an equivalent option for flink? What about the
>>parallelism argument The referring document seems to be brokenŠ
>>
>> This seems to be a dead link:
>>https://github.com/apache/flink/blob/master/docs/setup/%7B%7Bsite.baseurl
>>%7D%7D/apis/programming_guide.html#parallel-execution
>>
>> If I do increase the parallelism to be (taskManagers*slots) I hit the
>>"Insufficient number of network buffersŠ"
>>
>> I have 16 nodes (64 HT cores), and have run TaskSlots from 1, 4, 8, 16
>>and still the execution time is always around 5-6 minutes, using the
>>default parallelism.
>>
>> Regards,
>>     Bill
>> --
>> Jonathan (Bill) Sparks
>> Software Architecture
>> Cray Inc.
>>
>>
>