scaling flink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

scaling flink

Bill Sparks
Hi.

I'm running some comparisons between flink, MRv2, and spark(1.3), using the new Intel HiBench suite. I've started with the stock workcount example and I'm seeing some numbers which are not where I thought I'd be.

So the question I have is what the the configuration parameters which can affect the performance? Is there a performance/tuning guide.

What we have – hardware wise are 48 Haswell/32 physical/64 HT cores with 128 GB, FDR connect nodes. I'm parsing 2TB of text, using the following parameters.

./bin/flink run -m yarn-cluster \
-yD fs.overwrite-files=true \
-yD fs.output.always-create-directory=true \
-yq \
-yn $((666)) \
-yD taskmanager.numberOfTaskSlots=$((1)) \
-yD parallelization.degree.default=$((666)) \
-ytm $((4*1024)) \
-yjm $((4*1024)) \
./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar \
hdfs:///user/jsparks/HiBench/Wordcount/Input \
hdfs:///user/jsparks/HiBench/Wordcount/Output

Any pointers would be greatly appreciated.

Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node     
HadoopWordcount 2015-06-03 10:45:11 2052360935068        763.106              2689483420           2689483420 
JavaSparkWordcount 2015-06-03 10:55:24 2052360935068        411.246              4990591847           4990591847 
ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068        342.777              5987452294           5987452294    
Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node 
flinkWordCount 2015-06-04 16:27:27 2052360935068 647.383 3170242244 66046713

 
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.
Reply | Threaded
Open this post in threaded view
|

Re: scaling flink

Stephan Ewen
Hi Bill!

For the WordCount case, these numbers are not unexpected. Flink does not yet use a hash aggregator for the "reduce(v1, v2)" call, but uses a sort-based aggregation for that. Flink's sort aggregations are very reliable and very scalable compared to many hash aggregations, but often more expensive. Especially on low-key-cardinality data sets, hash aggregations outperform sort aggregations.

It is on the roadmap to add a managed-memory hash aggregator that is reliable. For now, Flink's runtime has managed memory sorts and hash-joins, so we stuck with the reliability over the performance.

It is cool to see that you are doing an evaluation and we are very curious about your outcomes. Let us now please how it looks for other operations and patterns, like joins, iterations, ...



Concerning performance tuning, here are a few pointers that may be interesting:

  - You are using a lot of very small TaskManagers, each with one slot. It will most likely be faster if you use fewer TaskManagers with more slots, because then the network stack is shared between more tasks. This results in fewer TCP connections, which each carry more data. You could try "-yn $((111)) -ytm $((24*1024)) -yD taskmanager.numberOfTaskSlots=$((6))" for example.

  - The example word-count implementation is not particularly tuned, I think one can do better there.

  - Flink has a mode to reuse objects, which takes a bit of pressure from the garbage collector. Where objects are not cached by the user code, this may help reduce pressure that user code imposes on the GarbageCollector.


BTW: Are you including the YARN startup time, or are you measuring from when the program execution starts?


Please pig us if you have more questions!


Greetings,
Stephan


On Fri, Jun 5, 2015 at 5:16 PM, Bill Sparks <[hidden email]> wrote:
Hi.

I'm running some comparisons between flink, MRv2, and spark(1.3), using the new Intel HiBench suite. I've started with the stock workcount example and I'm seeing some numbers which are not where I thought I'd be.

So the question I have is what the the configuration parameters which can affect the performance? Is there a performance/tuning guide.

What we have – hardware wise are 48 Haswell/32 physical/64 HT cores with 128 GB, FDR connect nodes. I'm parsing 2TB of text, using the following parameters.

./bin/flink run -m yarn-cluster \
-yD fs.overwrite-files=true \
-yD fs.output.always-create-directory=true \
-yq \
-yn $((666)) \
-yD taskmanager.numberOfTaskSlots=$((1)) \
-yD parallelization.degree.default=$((666)) \
-ytm $((4*1024)) \
-yjm $((4*1024)) \
./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar \
hdfs:///user/jsparks/HiBench/Wordcount/Input \
hdfs:///user/jsparks/HiBench/Wordcount/Output

Any pointers would be greatly appreciated.

Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node     
HadoopWordcount 2015-06-03 10:45:11 2052360935068        763.106              2689483420           2689483420 
JavaSparkWordcount 2015-06-03 10:55:24 2052360935068        411.246              4990591847           4990591847 
ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068        342.777              5987452294           5987452294    
Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node 
flinkWordCount 2015-06-04 16:27:27 2052360935068 647.383 3170242244 66046713

 
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.

Reply | Threaded
Open this post in threaded view
|

Re: scaling flink

Stephan Ewen
It was supposed to mean "please PING us" ;-)

On Fri, Jun 5, 2015 at 7:21 PM, Stephan Ewen <[hidden email]> wrote:
Hi Bill!

For the WordCount case, these numbers are not unexpected. Flink does not yet use a hash aggregator for the "reduce(v1, v2)" call, but uses a sort-based aggregation for that. Flink's sort aggregations are very reliable and very scalable compared to many hash aggregations, but often more expensive. Especially on low-key-cardinality data sets, hash aggregations outperform sort aggregations.

It is on the roadmap to add a managed-memory hash aggregator that is reliable. For now, Flink's runtime has managed memory sorts and hash-joins, so we stuck with the reliability over the performance.

It is cool to see that you are doing an evaluation and we are very curious about your outcomes. Let us now please how it looks for other operations and patterns, like joins, iterations, ...



Concerning performance tuning, here are a few pointers that may be interesting:

  - You are using a lot of very small TaskManagers, each with one slot. It will most likely be faster if you use fewer TaskManagers with more slots, because then the network stack is shared between more tasks. This results in fewer TCP connections, which each carry more data. You could try "-yn $((111)) -ytm $((24*1024)) -yD taskmanager.numberOfTaskSlots=$((6))" for example.

  - The example word-count implementation is not particularly tuned, I think one can do better there.

  - Flink has a mode to reuse objects, which takes a bit of pressure from the garbage collector. Where objects are not cached by the user code, this may help reduce pressure that user code imposes on the GarbageCollector.


BTW: Are you including the YARN startup time, or are you measuring from when the program execution starts?


Please pig us if you have more questions!


Greetings,
Stephan


On Fri, Jun 5, 2015 at 5:16 PM, Bill Sparks <[hidden email]> wrote:
Hi.

I'm running some comparisons between flink, MRv2, and spark(1.3), using the new Intel HiBench suite. I've started with the stock workcount example and I'm seeing some numbers which are not where I thought I'd be.

So the question I have is what the the configuration parameters which can affect the performance? Is there a performance/tuning guide.

What we have – hardware wise are 48 Haswell/32 physical/64 HT cores with 128 GB, FDR connect nodes. I'm parsing 2TB of text, using the following parameters.

./bin/flink run -m yarn-cluster \
-yD fs.overwrite-files=true \
-yD fs.output.always-create-directory=true \
-yq \
-yn $((666)) \
-yD taskmanager.numberOfTaskSlots=$((1)) \
-yD parallelization.degree.default=$((666)) \
-ytm $((4*1024)) \
-yjm $((4*1024)) \
./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar \
hdfs:///user/jsparks/HiBench/Wordcount/Input \
hdfs:///user/jsparks/HiBench/Wordcount/Output

Any pointers would be greatly appreciated.

Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node     
HadoopWordcount 2015-06-03 10:45:11 2052360935068        763.106              2689483420           2689483420 
JavaSparkWordcount 2015-06-03 10:55:24 2052360935068        411.246              4990591847           4990591847 
ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068        342.777              5987452294           5987452294    
Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node 
flinkWordCount 2015-06-04 16:27:27 2052360935068 647.383 3170242244 66046713

 
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.


Reply | Threaded
Open this post in threaded view
|

Re: scaling flink

Bill Sparks
I did the translation, thanks. The code is rerunning.

Are there any network settings that can effect performance. As my email suggest we are running on a Cray system, it's bigger brother uses a proprietary network and any tuning hints might make a difference.

Thanks again.
   Bill
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.

From: Stephan Ewen <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, June 5, 2015 12:48 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: scaling flink

It was supposed to mean "please PING us" ;-)

On Fri, Jun 5, 2015 at 7:21 PM, Stephan Ewen <[hidden email]> wrote:
Hi Bill!

For the WordCount case, these numbers are not unexpected. Flink does not yet use a hash aggregator for the "reduce(v1, v2)" call, but uses a sort-based aggregation for that. Flink's sort aggregations are very reliable and very scalable compared to many hash aggregations, but often more expensive. Especially on low-key-cardinality data sets, hash aggregations outperform sort aggregations.

It is on the roadmap to add a managed-memory hash aggregator that is reliable. For now, Flink's runtime has managed memory sorts and hash-joins, so we stuck with the reliability over the performance.

It is cool to see that you are doing an evaluation and we are very curious about your outcomes. Let us now please how it looks for other operations and patterns, like joins, iterations, ...



Concerning performance tuning, here are a few pointers that may be interesting:

  - You are using a lot of very small TaskManagers, each with one slot. It will most likely be faster if you use fewer TaskManagers with more slots, because then the network stack is shared between more tasks. This results in fewer TCP connections, which each carry more data. You could try "-yn $((111)) -ytm $((24*1024)) -yD taskmanager.numberOfTaskSlots=$((6))" for example.

  - The example word-count implementation is not particularly tuned, I think one can do better there.

  - Flink has a mode to reuse objects, which takes a bit of pressure from the garbage collector. Where objects are not cached by the user code, this may help reduce pressure that user code imposes on the GarbageCollector.


BTW: Are you including the YARN startup time, or are you measuring from when the program execution starts?


Please pig us if you have more questions!


Greetings,
Stephan


On Fri, Jun 5, 2015 at 5:16 PM, Bill Sparks <[hidden email]> wrote:
Hi.

I'm running some comparisons between flink, MRv2, and spark(1.3), using the new Intel HiBench suite. I've started with the stock workcount example and I'm seeing some numbers which are not where I thought I'd be.

So the question I have is what the the configuration parameters which can affect the performance? Is there a performance/tuning guide.

What we have – hardware wise are 48 Haswell/32 physical/64 HT cores with 128 GB, FDR connect nodes. I'm parsing 2TB of text, using the following parameters.

./bin/flink run -m yarn-cluster \
-yD fs.overwrite-files=true \
-yD fs.output.always-create-directory=true \
-yq \
-yn $((666)) \
-yD taskmanager.numberOfTaskSlots=$((1)) \
-yD parallelization.degree.default=$((666)) \
-ytm $((4*1024)) \
-yjm $((4*1024)) \
./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar \
hdfs:///user/jsparks/HiBench/Wordcount/Input \
hdfs:///user/jsparks/HiBench/Wordcount/Output

Any pointers would be greatly appreciated.

Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node     
HadoopWordcount 2015-06-03 10:45:11 2052360935068        763.106              2689483420           2689483420 
JavaSparkWordcount 2015-06-03 10:55:24 2052360935068        411.246              4990591847           4990591847 
ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068        342.777              5987452294           5987452294    
Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node 
flinkWordCount 2015-06-04 16:27:27 2052360935068 647.383 3170242244 66046713

 
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.


Reply | Threaded
Open this post in threaded view
|

Re: scaling flink

Stephan Ewen
I think the main settings for the network are the number of network buffers, to make sure the shuffle runs smooth.

Flink uses the netty library for the network stack. It starts 2*cores network threads by default, which is mostly good. If you have many containers on each machine, and the containers look like they have a lot of cores, this may start too many network threads.

The JVM and Netty can (as far as I know) benefit from kernel bypass network drivers, but that is not Flink specific.

Stephan



On Fri, Jun 5, 2015 at 7:54 PM, Bill Sparks <[hidden email]> wrote:
I did the translation, thanks. The code is rerunning.

Are there any network settings that can effect performance. As my email suggest we are running on a Cray system, it's bigger brother uses a proprietary network and any tuning hints might make a difference.

Thanks again.
   Bill
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.

From: Stephan Ewen <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, June 5, 2015 12:48 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: scaling flink

It was supposed to mean "please PING us" ;-)

On Fri, Jun 5, 2015 at 7:21 PM, Stephan Ewen <[hidden email]> wrote:
Hi Bill!

For the WordCount case, these numbers are not unexpected. Flink does not yet use a hash aggregator for the "reduce(v1, v2)" call, but uses a sort-based aggregation for that. Flink's sort aggregations are very reliable and very scalable compared to many hash aggregations, but often more expensive. Especially on low-key-cardinality data sets, hash aggregations outperform sort aggregations.

It is on the roadmap to add a managed-memory hash aggregator that is reliable. For now, Flink's runtime has managed memory sorts and hash-joins, so we stuck with the reliability over the performance.

It is cool to see that you are doing an evaluation and we are very curious about your outcomes. Let us now please how it looks for other operations and patterns, like joins, iterations, ...



Concerning performance tuning, here are a few pointers that may be interesting:

  - You are using a lot of very small TaskManagers, each with one slot. It will most likely be faster if you use fewer TaskManagers with more slots, because then the network stack is shared between more tasks. This results in fewer TCP connections, which each carry more data. You could try "-yn $((111)) -ytm $((24*1024)) -yD taskmanager.numberOfTaskSlots=$((6))" for example.

  - The example word-count implementation is not particularly tuned, I think one can do better there.

  - Flink has a mode to reuse objects, which takes a bit of pressure from the garbage collector. Where objects are not cached by the user code, this may help reduce pressure that user code imposes on the GarbageCollector.


BTW: Are you including the YARN startup time, or are you measuring from when the program execution starts?


Please pig us if you have more questions!


Greetings,
Stephan


On Fri, Jun 5, 2015 at 5:16 PM, Bill Sparks <[hidden email]> wrote:
Hi.

I'm running some comparisons between flink, MRv2, and spark(1.3), using the new Intel HiBench suite. I've started with the stock workcount example and I'm seeing some numbers which are not where I thought I'd be.

So the question I have is what the the configuration parameters which can affect the performance? Is there a performance/tuning guide.

What we have – hardware wise are 48 Haswell/32 physical/64 HT cores with 128 GB, FDR connect nodes. I'm parsing 2TB of text, using the following parameters.

./bin/flink run -m yarn-cluster \
-yD fs.overwrite-files=true \
-yD fs.output.always-create-directory=true \
-yq \
-yn $((666)) \
-yD taskmanager.numberOfTaskSlots=$((1)) \
-yD parallelization.degree.default=$((666)) \
-ytm $((4*1024)) \
-yjm $((4*1024)) \
./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar \
hdfs:///user/jsparks/HiBench/Wordcount/Input \
hdfs:///user/jsparks/HiBench/Wordcount/Output

Any pointers would be greatly appreciated.

Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node     
HadoopWordcount 2015-06-03 10:45:11 2052360935068        763.106              2689483420           2689483420 
JavaSparkWordcount 2015-06-03 10:55:24 2052360935068        411.246              4990591847           4990591847 
ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068        342.777              5987452294           5987452294    
Type                Date       Time     Input_data_size      Duration(s)          Throughput(bytes/s)  Throughput/node 
flinkWordCount 2015-06-04 16:27:27 2052360935068 647.383 3170242244 66046713

 
-- 
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.