Flink performance tuning

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink performance tuning

Serhiy Boychenko

Hey,

 

I have successfully integrated Flink into our very small test cluster (3 machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am started the session to use YARN as RM and the data is being read from HDFS.

/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

 

My code is very simple, flatMap is being done on the CSV data, so I extract the signal name and value, I group by signal name and performing group reduce on the data in order to calculate max, min and average on the collected values.

 

I have observed on 3 nodes, the average processing rate is around 11Mbytes/second. I have compared the results with MR execution(without any kind of tuning) and I am quite surprised, since the performance of Hadoop is 85Mybtes/second when executing the same query on the same data. I have read few reports claiming that Flink is better in comparison to MR and other tools. I am wondering what is wrong? Any clue?

 

The processing rate is calculated according to the following formula:

Overall processing rate = sum of total amount of data read per job/sum of total time the job was running (including staging periods)

 

Best regards,

Serhiy.

Reply | Threaded
Open this post in threaded view
|

Re: Flink performance tuning

rmetzger0
Hi,

Can you try running the job with 8 slots, 7 GB (maybe you need to go down to 6 GB) and only three TaskManagers (-n 3) ?

I'm suggesting this, because you have many small JVMs running on your machines. On such small machines you can probably get much more use out of your available memory by running a few big task managers (which can share all the common management infra).
Another plus of running a few JVMs is that you are deducing network overhead, because communication can happen within the process, and less network transfer is required.

Another big factor for performance are the datatypes used. How do you represent your data in Flink? (Are you using the TupleX types? or POJOs?)
How do you select the key for the grouping?

Regards,
Robert


On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko <[hidden email]> wrote:

Hey,

 

I have successfully integrated Flink into our very small test cluster (3 machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am started the session to use YARN as RM and the data is being read from HDFS.

/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

 

My code is very simple, flatMap is being done on the CSV data, so I extract the signal name and value, I group by signal name and performing group reduce on the data in order to calculate max, min and average on the collected values.

 

I have observed on 3 nodes, the average processing rate is around 11Mbytes/second. I have compared the results with MR execution(without any kind of tuning) and I am quite surprised, since the performance of Hadoop is 85Mybtes/second when executing the same query on the same data. I have read few reports claiming that Flink is better in comparison to MR and other tools. I am wondering what is wrong? Any clue?

 

The processing rate is calculated according to the following formula:

Overall processing rate = sum of total amount of data read per job/sum of total time the job was running (including staging periods)

 

Best regards,

Serhiy.


Reply | Threaded
Open this post in threaded view
|

Re: Flink performance tuning

Stephan Ewen
One issue may be that the selection of YARN containers is not HDFS locality aware here.
Hence, Flink may read more splits remotely, where MR reads more splits locally.

On Fri, May 13, 2016 at 3:25 PM, Robert Metzger <[hidden email]> wrote:
Hi,

Can you try running the job with 8 slots, 7 GB (maybe you need to go down to 6 GB) and only three TaskManagers (-n 3) ?

I'm suggesting this, because you have many small JVMs running on your machines. On such small machines you can probably get much more use out of your available memory by running a few big task managers (which can share all the common management infra).
Another plus of running a few JVMs is that you are deducing network overhead, because communication can happen within the process, and less network transfer is required.

Another big factor for performance are the datatypes used. How do you represent your data in Flink? (Are you using the TupleX types? or POJOs?)
How do you select the key for the grouping?

Regards,
Robert


On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko <[hidden email]> wrote:

Hey,

 

I have successfully integrated Flink into our very small test cluster (3 machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am started the session to use YARN as RM and the data is being read from HDFS.

/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

 

My code is very simple, flatMap is being done on the CSV data, so I extract the signal name and value, I group by signal name and performing group reduce on the data in order to calculate max, min and average on the collected values.

 

I have observed on 3 nodes, the average processing rate is around 11Mbytes/second. I have compared the results with MR execution(without any kind of tuning) and I am quite surprised, since the performance of Hadoop is 85Mybtes/second when executing the same query on the same data. I have read few reports claiming that Flink is better in comparison to MR and other tools. I am wondering what is wrong? Any clue?

 

The processing rate is calculated according to the following formula:

Overall processing rate = sum of total amount of data read per job/sum of total time the job was running (including staging periods)

 

Best regards,

Serhiy.



Reply | Threaded
Open this post in threaded view
|

RE: Flink performance tuning

Serhiy Boychenko
In reply to this post by rmetzger0

Cheerz,

 

Basically the data is stored in CSV format. The flatMap which I have implemented does:

String[] tokens = value.split(",");

out.collect(new Tuple2<String, Double>(tokens[0], Double.valueOf(tokens[2])));

 

The result calculation looks like:

DataSet<Tuple2<String, String>> statistics = rawData.flatMap(new VariableParser()).groupBy(0).reduceGroup(new ReduceStats());

 

ReduceStats implements GroupReduceFunction, iterates and addes values into DescriptiveStatistics and at the end output min, max and avg.

 

I ran the new experiments with suggested configuration and what I have noticed is only one task slot is being occupied. Something I am doing is wrong..

3

Task Managers

21

Task Slots

20

Available Task Slots

 

 

Best regards,

Serhiy.

 

From: Robert Metzger [mailto:[hidden email]]
Sent: 13 May 2016 15:26
To: [hidden email]
Subject: Re: Flink performance tuning

 

Hi,

 

Can you try running the job with 8 slots, 7 GB (maybe you need to go down to 6 GB) and only three TaskManagers (-n 3) ?

 

I'm suggesting this, because you have many small JVMs running on your machines. On such small machines you can probably get much more use out of your available memory by running a few big task managers (which can share all the common management infra).

Another plus of running a few JVMs is that you are deducing network overhead, because communication can happen within the process, and less network transfer is required.

 

Another big factor for performance are the datatypes used. How do you represent your data in Flink? (Are you using the TupleX types? or POJOs?)

How do you select the key for the grouping?

 

Regards,

Robert

 

 

On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko <[hidden email]> wrote:

Hey,

 

I have successfully integrated Flink into our very small test cluster (3 machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am started the session to use YARN as RM and the data is being read from HDFS.

/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

 

My code is very simple, flatMap is being done on the CSV data, so I extract the signal name and value, I group by signal name and performing group reduce on the data in order to calculate max, min and average on the collected values.

 

I have observed on 3 nodes, the average processing rate is around 11Mbytes/second. I have compared the results with MR execution(without any kind of tuning) and I am quite surprised, since the performance of Hadoop is 85Mybtes/second when executing the same query on the same data. I have read few reports claiming that Flink is better in comparison to MR and other tools. I am wondering what is wrong? Any clue?

 

The processing rate is calculated according to the following formula:

Overall processing rate = sum of total amount of data read per job/sum of total time the job was running (including staging periods)

 

Best regards,

Serhiy.

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink performance tuning

rmetzger0
Hi,

Flink is not using all available slots by default. You have to pass the "parallelism" as a parameter "-p 21" when submitting the job.
This might also explain the performance difference compared to MapReduce.

The datatypes you are using look okay. I don't see a performance issue there.

Regards,
Robert


On Tue, May 17, 2016 at 10:36 AM, Serhiy Boychenko <[hidden email]> wrote:

Cheerz,

 

Basically the data is stored in CSV format. The flatMap which I have implemented does:

String[] tokens = value.split(",");

out.collect(new Tuple2<String, Double>(tokens[0], Double.valueOf(tokens[2])));

 

The result calculation looks like:

DataSet<Tuple2<String, String>> statistics = rawData.flatMap(new VariableParser()).groupBy(0).reduceGroup(new ReduceStats());

 

ReduceStats implements GroupReduceFunction, iterates and addes values into DescriptiveStatistics and at the end output min, max and avg.

 

I ran the new experiments with suggested configuration and what I have noticed is only one task slot is being occupied. Something I am doing is wrong..

3

Task Managers

21

Task Slots

20

Available Task Slots

 

 

Best regards,

Serhiy.

 

From: Robert Metzger [mailto:[hidden email]]
Sent: 13 May 2016 15:26
To: [hidden email]
Subject: Re: Flink performance tuning

 

Hi,

 

Can you try running the job with 8 slots, 7 GB (maybe you need to go down to 6 GB) and only three TaskManagers (-n 3) ?

 

I'm suggesting this, because you have many small JVMs running on your machines. On such small machines you can probably get much more use out of your available memory by running a few big task managers (which can share all the common management infra).

Another plus of running a few JVMs is that you are deducing network overhead, because communication can happen within the process, and less network transfer is required.

 

Another big factor for performance are the datatypes used. How do you represent your data in Flink? (Are you using the TupleX types? or POJOs?)

How do you select the key for the grouping?

 

Regards,

Robert

 

 

On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko <[hidden email]> wrote:

Hey,

 

I have successfully integrated Flink into our very small test cluster (3 machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am started the session to use YARN as RM and the data is being read from HDFS.

/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

 

My code is very simple, flatMap is being done on the CSV data, so I extract the signal name and value, I group by signal name and performing group reduce on the data in order to calculate max, min and average on the collected values.

 

I have observed on 3 nodes, the average processing rate is around 11Mbytes/second. I have compared the results with MR execution(without any kind of tuning) and I am quite surprised, since the performance of Hadoop is 85Mybtes/second when executing the same query on the same data. I have read few reports claiming that Flink is better in comparison to MR and other tools. I am wondering what is wrong? Any clue?

 

The processing rate is calculated according to the following formula:

Overall processing rate = sum of total amount of data read per job/sum of total time the job was running (including staging periods)

 

Best regards,

Serhiy.